[GitHub] spark issue #16476: [SPARK-19084][SQL] Implement expression field

2018-07-16 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/16476
  
 @HyukjinKwon Done, thanks : )
Ping @maropu 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16476: [SPARK-19084][SQL] Implement expression field

2018-06-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/16476
  
@maropu Sure, I will update it this week.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19755: [SPARK-22524][SQL] Subquery shows reused on UI SQ...

2018-06-10 Thread gczsjdy
Github user gczsjdy closed the pull request at:

https://github.com/apache/spark/pull/19755


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20809: [SPARK-23667][CORE] Better scala version check

2018-05-20 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20809
  
@vanzin Sorry for the late reply. According to the call stack, it's the 
first place called `getScalaVersion`, `isTest` is true so we can go into that 
path.
This happens in travis. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20809: [SPARK-23667][CORE] Better scala version check

2018-05-13 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20809
  
@vanzin Sorry but I will update it in next week, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21022: Fpga acc

2018-04-10 Thread gczsjdy
Github user gczsjdy closed the pull request at:

https://github.com/apache/spark/pull/21022


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21022: Fpga acc

2018-04-10 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/21022
  
@HyukjinKwon Sorry, it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21022: Fpga acc

2018-04-10 Thread gczsjdy
GitHub user gczsjdy opened a pull request:

https://github.com/apache/spark/pull/21022

Fpga acc



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gczsjdy/spark fpga_acc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21022


commit 7610bf07c16a8ff6d0b25072c43cb1811525a0cc
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-05T15:10:24Z

Add framework for FPGA projection

commit 9e9d2b10cc0274634873f5193580dad86a74c82f
Author: Guo <chenzhao.guo@...>
Date:   2017-07-05T17:24:08Z

Implement Row=>FPGABatch adapter

commit 5911f7451b8c7c6d028e4dbf40de603e2c3eec51
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-06T05:10:11Z

Implement FPGABatch=>Row adapter

commit 7f858bac018dbeb0c33bd254058b50b642a1ddc4
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-06T05:21:00Z

Sum all rows in the same partition to 1 ByteBuffer

commit e8d33622cec06ff61c0c6991de93b5f0be68658e
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T03:46:36Z

Fix bugs and refactor

commit 77c0eba7de191146c837d9868583d6902333fb0a
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T03:48:16Z

Add test cases

commit b349b428beb53b34efbb55e02e83b4afbb0ffbb1
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T06:16:07Z

Not use RDD[ByteBuffer] as transfer

commit 8a1c5f456f17c7dd140ce0b5bda977c377120ecf
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T06:19:53Z

Update test

commit 3057ed32d127afaa1709ab455518b4f7294743e9
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T06:21:52Z

Refactor: split different process

commit 50b16f0b587f1f3bca052ff44b4f5af203492d98
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T06:51:37Z

Add String null trans code and test

commit 981bc7d011fdbfc5e07f87f21455f144372ae8bd
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T07:01:49Z

Add insert arbitrary bytes code for FPGA's need, comment it for future ease 
of test

commit 0adf36f4e4c16a01f57a38c6d6b314fd7308f557
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-07T07:42:08Z

Add CMCC output Schema

commit cc9166b58a745b5a90cb829e9f6a98c76e2f466a
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-10T02:02:36Z

Load native SQL FPGA engine

commit 83609ebf11d57fb274ed526336ef072a9f7a9599
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-10T07:02:23Z

Remove not used bug code

commit 6fb740f5bdecfe413df8c84080a17840d9996e20
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-10T07:04:13Z

Cut off excess chars for FPGA

commit 986010251ca096986c67c4984ea1fc6e10749f9d
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-10T07:09:41Z

Add Java class calling FPGA code, not doing anything in FPGA engine for the 
ease of unit test

commit 3e97550bc6b2da7baedfdd7545335aeae6be3520
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-13T08:19:07Z

Call FPGA mocker, replace testOutputSchema with real outputSchema

commit 2670c2cc91ec85944afe300e84c4068418fff3b6
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-13T12:05:48Z

Refresh OutputSchema and add reserved bytes to FPGA output

commit 8d94f2d584fa45cbed10c1e9e6eee47b5ae2be00
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-13T12:26:39Z

Refactor readBytesFrom buffer to be consistent with put

commit 56dbeab3a730a839b0743fdeb56946eeb60ec929
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-14T01:40:44Z

Add CMCC test case, and convert to new parquet code

commit f8514c7149c22a3f3f3f7d578d16734c453792df
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-14T05:05:42Z

Refresh test code

commit 87ecb423359a3d92ffd9c25ef2d85e2f1c3e4758
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-14T05:06:10Z

Force FileScan read all columns

commit c137ab3ed84d1e1a1e0be4713363b460354bc400
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-20T04:14:55Z

Reuse UnsafeRowWriter & BufferHolder

commit 3d272160581aa85399cb73cadf8015e3e8e18e24
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-20T04:26:45Z

Reduce steps load string column in row to buffer

commit 7a90c4d39735a9516462e7c621d750c8559b91e2
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-25T03:07:00Z

Return buffer to FPGA after use

commit 7a96f45f94c98d1e02fdb492198895c0535bbf98
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-07-25T04:00:22Z

Update FPGA JNI wrapper

commit 8938726ba854d97cdd7f3edcc323c44739c84d25
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2017-08-01T07:49:34Z


[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...

2018-03-31 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r178437551
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id)
+  val data = spark.range(0, n, 1, 2).sort('id)
--- End diff --

: ) Know this now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...

2018-03-26 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r177311524
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id)
+  val data = spark.range(0, n, 1, 2).sort('id)
--- End diff --

Why change this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20809: [SPARK-23667][CORE] Better scala version check

2018-03-19 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20809
  
@vanzin Thanks. : )
I am testing using [OAP](https://github.com/Intel-bigdata/OAP) with 
pre-built Spark on `LocalClusterMode`.
This is on travis and no SPARK_HOME is set.
The `mvn test` command will produce this error:

`23:49:56.997 ERROR org.apache.spark.deploy.worker.ExecutorRunner: Error 
running executor
java.lang.IllegalStateException: Cannot find any build directories.
at 
org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
at 
org.apache.spark.launcher.AbstractCommandBuilder.getScalaVersion(AbstractCommandBuilder.java:241)
at 
org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:147)
at 
org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:118)
at 
org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:39)
at 
org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:47)
at 
org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:63)
at 
org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:51)
at 
org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:145)
at 
org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20809: [SPARK-23667][CORE] Better scala version check

2018-03-16 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20809
  
@viirya Yes, but this is only for people who will investigate on Spark 
code, and it also requires manual efforts. Isn't it better if we get this 
automatically?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20809: [SPARK-23667][CORE] Better scala version check

2018-03-15 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20809
  
cc @cloud-fan @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20809: [CORE] Better scala version check

2018-03-13 Thread gczsjdy
GitHub user gczsjdy opened a pull request:

https://github.com/apache/spark/pull/20809

[CORE] Better scala version check

## What changes were proposed in this pull request?

In some cases when outer project use pre-built Spark as dependency, 
`getScalaVersion` will fail due to `launcher` directory doesn't exist. This PR 
also checks in `jars` directory.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gczsjdy/spark scala_version_check

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20809


commit 49c6198e9bfd2de986ca2bb9d7fc153f1b4c6be0
Author: GuoChenzhao <chenzhao.guo@...>
Date:   2018-03-13T08:13:04Z

Better scala version check




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...

2018-01-26 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20303#discussion_r164089610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.MapOutputStatistics
+import org.apache.spark.broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * In adaptive execution mode, an execution plan is divided into multiple 
QueryStages. Each
+ * QueryStage is a sub-tree that runs in a single stage.
+ */
+abstract class QueryStage extends UnaryExecNode {
+
+  var child: SparkPlan
+
+  // Ignore this wrapper for canonicalizing.
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Execute childStages and wait until all stages are completed. Use a 
thread pool to avoid
+   * blocking on one child stage.
+   */
+  def executeChildStages(): Unit = {
+// Handle broadcast stages
+val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
+  case bqs: BroadcastQueryStageInput => bqs.childStage
+}
+val broadcastFutures = broadcastQueryStages.map { queryStage =>
+  Future { queryStage.prepareBroadcast() }(QueryStage.executionContext)
+}
+
+// Submit shuffle stages
+val executionId = 
sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect {
+  case sqs: ShuffleQueryStageInput => sqs.childStage
+}
+val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
+  Future {
+SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) 
{
+  queryStage.execute()
+}
+  }(QueryStage.executionContext)
+}
+
+ThreadUtils.awaitResult(
+  Future.sequence(broadcastFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+ThreadUtils.awaitResult(
+  Future.sequence(shuffleStageFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+  }
+
+  /**
+   * Before executing the plan in this query stage, we execute all child 
stages, optimize the plan
+   * in this stage and determine the reducer number based on the child 
stages' statistics. Finally
+   * we do a codegen for this query stage and update the UI with the new 
plan.
+   */
+  def prepareExecuteStage(): Unit = {
+// 1. Execute childStages
+executeChildStages()
+// It is possible to optimize this stage's plan here based on the 
child stages' statistics.
+
+// 2. Determine reducer number
+val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect {
+  case input: ShuffleQueryStageInput => input
+}
+val childMapOutputStatistics = 
queryStageInputs.map(_.childStage.mapOutputStatistics)
+  .filter(_ != null).toArray
+if (childMapOutputStatistics.length > 0) {
+  val exchangeCoordinator = new ExchangeCoordinator(
+conf.targetPostShuffleInputSize,
+conf.minNumPostS

[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2018-01-24 Thread gczsjdy
Github user gczsjdy closed the pull request at:

https://github.com/apache/spark/pull/19862


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle read less ...

2018-01-24 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19862
  
@cloud-fan Ok, thanks for your time, I will close this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...

2018-01-02 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20135#discussion_r159353652
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -271,33 +271,45 @@ case class ConcatWs(children: Seq[Expression])
   }
 }
 
+/**
+ * An expression that returns the `n`-th input in given inputs.
+ * If all inputs are binary, `elt` returns an output as binary. Otherwise, 
it returns as string.
+ * If any input is null, `elt` returns null.
+ */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(n, str1, str2, ...) - Returns the `n`-th string, e.g., 
returns `str2` when `n` is 2.",
+  usage = "_FUNC_(n, input1, input2, ...) - Returns the `n`-th input, 
e.g., returns `input2` when `n` is 2.",
   examples = """
 Examples:
   > SELECT _FUNC_(1, 'scala', 'java');
scala
   """)
 // scalastyle:on line.size.limit
-case class Elt(children: Seq[Expression])
-  extends Expression with ImplicitCastInputTypes {
+case class Elt(children: Seq[Expression]) extends Expression {
 
   private lazy val indexExpr = children.head
-  private lazy val stringExprs = children.tail.toArray
+  private lazy val inputExprs = children.tail.toArray
 
   /** This expression is always nullable because it returns null if index 
is out of range. */
   override def nullable: Boolean = true
 
-  override def dataType: DataType = StringType
-
-  override def inputTypes: Seq[DataType] = IntegerType +: 
Seq.fill(children.size - 1)(StringType)
+  override def dataType: DataType = 
inputExprs.map(_.dataType).headOption.getOrElse(StringType)
--- End diff --

I meant for expression `elt(1)` with `eltOutputAsString` as `false`, since 
the index is out of range. Is it better to make the result `null` of 
`BinaryType`? Now I think your solution makes more sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...

2018-01-02 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20135#discussion_r159235432
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -684,6 +685,34 @@ object TypeCoercion {
 }
   }
 
+  /**
+   * Coerces the types of [[Elt]] children to expected ones.
+   *
+   * If `spark.sql.function.eltOutputAsString` is false and all children 
types are binary,
+   * the expected types are binary. Otherwise, the expected ones are 
strings.
+   */
+  case class EltCoercion(conf: SQLConf) extends TypeCoercionRule {
+
+override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = 
plan transform { case p =>
+  p transformExpressionsUp {
+// Skip nodes if unresolved or not enough children
+case c @ Elt(children) if !c.childrenResolved || children.size < 2 
=> c
+case c @ Elt(children) if conf.eltOutputAsString ||
+!children.tail.map(_.dataType).forall(_ == BinaryType) =>
+  val index = children.head
+  val newIndex = ImplicitTypeCasts.implicitCast(index, 
IntegerType).getOrElse(index)
+  val newInputs = children.tail.map { e =>
+ImplicitTypeCasts.implicitCast(e, StringType).getOrElse(e)
+  }
+  c.copy(children = newIndex +: newInputs)
+case c @ Elt(children) =>
+  val index = children.head
+  val newIndex = ImplicitTypeCasts.implicitCast(index, 
IntegerType).getOrElse(index)
+  c.copy(children = newIndex +: children.tail)
--- End diff --

Is is better to merge the common parts of the last 2 cases? We can add one 
more nested pattern match, but I'm not sure this is a better way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...

2018-01-02 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20135#discussion_r159234455
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -271,33 +271,45 @@ case class ConcatWs(children: Seq[Expression])
   }
 }
 
+/**
+ * An expression that returns the `n`-th input in given inputs.
+ * If all inputs are binary, `elt` returns an output as binary. Otherwise, 
it returns as string.
+ * If any input is null, `elt` returns null.
+ */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(n, str1, str2, ...) - Returns the `n`-th string, e.g., 
returns `str2` when `n` is 2.",
+  usage = "_FUNC_(n, input1, input2, ...) - Returns the `n`-th input, 
e.g., returns `input2` when `n` is 2.",
   examples = """
 Examples:
   > SELECT _FUNC_(1, 'scala', 'java');
scala
   """)
 // scalastyle:on line.size.limit
-case class Elt(children: Seq[Expression])
-  extends Expression with ImplicitCastInputTypes {
+case class Elt(children: Seq[Expression]) extends Expression {
 
   private lazy val indexExpr = children.head
-  private lazy val stringExprs = children.tail.toArray
+  private lazy val inputExprs = children.tail.toArray
 
   /** This expression is always nullable because it returns null if index 
is out of range. */
   override def nullable: Boolean = true
 
-  override def dataType: DataType = StringType
-
-  override def inputTypes: Seq[DataType] = IntegerType +: 
Seq.fill(children.size - 1)(StringType)
+  override def dataType: DataType = 
inputExprs.map(_.dataType).headOption.getOrElse(StringType)
--- End diff --

Should we return null of `BinaryType` when `eltOutputAsString` is `false` 
and there's only 1 parameter for `Elt`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...

2017-12-28 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20010
  
Seems not a regular error? 
@bdrillard Maybe you can push a commit and trigger the test again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20099: [SPARK-22916][SQL] shouldn't bias towards build r...

2017-12-28 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20099#discussion_r158961184
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -158,45 +158,65 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   def smallerSide =
 if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
 
-  val buildRight = canBuildRight && right.stats.hints.broadcast
-  val buildLeft = canBuildLeft && left.stats.hints.broadcast
-
-  if (buildRight && buildLeft) {
+  if (canBuildRight && canBuildLeft) {
 // Broadcast smaller side base on its estimated physical size
 // if both sides have broadcast hint
 smallerSide
-  } else if (buildRight) {
+  } else if (canBuildRight) {
 BuildRight
-  } else if (buildLeft) {
+  } else if (canBuildLeft) {
 BuildLeft
-  } else if (canBuildRight && canBuildLeft) {
+  } else {
 // for the last default broadcast nested loop join
 smallerSide
-  } else {
-throw new AnalysisException("Can not decide which side to 
broadcast for this join")
   }
 }
 
+private def needsBroadcastByHints(joinType: JoinType, left: 
LogicalPlan, right: LogicalPlan)
--- End diff --

Maybe using `canBroadcastByHints` more consistent with the naming 
convention?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20099: [SPARK-22916][SQL] shouldn't bias towards build r...

2017-12-28 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20099#discussion_r158961453
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -158,45 +158,65 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   def smallerSide =
 if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight 
else BuildLeft
 
-  val buildRight = canBuildRight && right.stats.hints.broadcast
-  val buildLeft = canBuildLeft && left.stats.hints.broadcast
-
-  if (buildRight && buildLeft) {
+  if (canBuildRight && canBuildLeft) {
 // Broadcast smaller side base on its estimated physical size
 // if both sides have broadcast hint
 smallerSide
-  } else if (buildRight) {
+  } else if (canBuildRight) {
 BuildRight
-  } else if (buildLeft) {
+  } else if (canBuildLeft) {
 BuildLeft
-  } else if (canBuildRight && canBuildLeft) {
+  } else {
 // for the last default broadcast nested loop join
 smallerSide
-  } else {
-throw new AnalysisException("Can not decide which side to 
broadcast for this join")
   }
 }
 
+private def needsBroadcastByHints(joinType: JoinType, left: 
LogicalPlan, right: LogicalPlan)
+  : Boolean = {
+  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
+  val buildRight = canBuildRight(joinType) && 
right.stats.hints.broadcast
+  buildLeft || buildRight
+}
+
+private def broadcastSideByHints(joinType: JoinType, left: 
LogicalPlan, right: LogicalPlan)
+  : BuildSide = {
+  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
+  val buildRight = canBuildRight(joinType) && 
right.stats.hints.broadcast
+  broadcastSide(buildLeft, buildRight, left, right)
+}
+
+private def needsBroadcastByConfig(joinType: JoinType, left: 
LogicalPlan, right: LogicalPlan)
+  : Boolean = {
+  val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
+  val buildRight = canBuildRight(joinType) && canBroadcast(right)
+  buildLeft || buildRight
+}
+
+private def broadcastSideByConfig(joinType: JoinType, left: 
LogicalPlan, right: LogicalPlan)
--- End diff --

Is is better to use `xxxbySize`? `byConfig` might confuse people.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20043: [SPARK-22856][SQL] Add wrappers for codegen output and n...

2017-12-24 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20043
  
@viirya Thanks much. Actually local variable corresponds to `VariableValue` 
and `StatementValue`? IIUC `VariableValue` is value that depends on something 
else, but what is `StatementValue`? Maybe we can add more comments near the 
`xxValue` definition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20067: [SPARK-22894][SQL] DateTimeOperations should accept SQL ...

2017-12-24 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20067
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-21 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r158440114
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -158,11 +169,6 @@ object TypeCoercion {
 findTightestCommonType(t1, t2)
   .orElse(findWiderTypeForDecimal(t1, t2))
   .orElse(stringPromotion(t1, t2))
-  .orElse((t1, t2) match {
-case (ArrayType(et1, containsNull1), ArrayType(et2, 
containsNull2)) =>
-  findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
-case _ => None
-  })
--- End diff --

I like your implementation, except for the concerns I raised in another 
thread. Have you tested locally?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-21 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r158440005
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -158,11 +213,8 @@ object TypeCoercion {
 findTightestCommonType(t1, t2)
   .orElse(findWiderTypeForDecimal(t1, t2))
   .orElse(stringPromotion(t1, t2))
-  .orElse((t1, t2) match {
-case (ArrayType(et1, containsNull1), ArrayType(et2, 
containsNull2)) =>
-  findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
-case _ => None
-  })
+  .orElse(findWiderTypeForTwoComplex(t1, t2, findWiderTypeForTwo))
--- End diff --

Will this cause infinite loop? Calling `findWiderTypeForTwo` in 
`findWiderTypeForTwo`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20043: [SPARK-22856][SQL] Add wrappers for codegen output and n...

2017-12-21 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/20043
  
@viirya Sorry I didn't quite understand, how do we easily know the value by 
adding wrappers? Could you explain a little bit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20039: [SPARK-22850][core] Ensure queued events are deli...

2017-12-21 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20039#discussion_r158424211
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -124,13 +127,19 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
{
   }
 
   /** Post an event to all queues. */
-  def post(event: SparkListenerEvent): Unit = {
-if (!stopped.get()) {
-  metrics.numEventsPosted.inc()
+  def post(event: SparkListenerEvent): Unit = synchronized {
+if (stopped.get()) {
+  return
+}
--- End diff --

Is `synchronized` needed only due to the modification of `queuedEvents`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20039: [SPARK-22850][core] Ensure queued events are deli...

2017-12-21 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20039#discussion_r158309818
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -124,13 +127,19 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
{
   }
 
   /** Post an event to all queues. */
-  def post(event: SparkListenerEvent): Unit = {
-if (!stopped.get()) {
-  metrics.numEventsPosted.inc()
+  def post(event: SparkListenerEvent): Unit = synchronized {
+if (stopped.get()) {
+  return
+}
--- End diff --

What's the difference between this and the original way 
```
if (!stopped.get()) {  
...
}
``` 
?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19977#discussion_r157939820
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -48,17 +48,26 @@ import org.apache.spark.unsafe.types.{ByteArray, 
UTF8String}
   > SELECT _FUNC_('Spark', 'SQL');
SparkSQL
   """)
-case class Concat(children: Seq[Expression]) extends Expression with 
ImplicitCastInputTypes {
+case class Concat(children: Seq[Expression], isBinaryMode: Boolean = false)
+  extends Expression with ImplicitCastInputTypes {
 
-  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.size)(StringType)
-  override def dataType: DataType = StringType
+  def this(children: Seq[Expression]) = this(children, false)
--- End diff --

Sorry I didn't get it. We have the default parameter of `isBinaryMode = 
true`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157928910
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +102,33 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (ArrayType(pointType1, nullable1), ArrayType(pointType2, 
nullable2)) =>
+  val dataType = if (withStringPromotion) {
+findWiderTypeForTwo(pointType1, pointType2)
--- End diff --

I think we break the `findTightest` semantic by `withStringPromotion` 
judgement and calling `findWiderTypeForTwo`, these are what we should add in 
`Case 2 type widening` -> `findWiderTypeForTwo`. I suggest we put these logic 
in another function`findWiderTypeForDecimal`. Will mention this in the other 
thread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157929494
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -158,11 +169,6 @@ object TypeCoercion {
 findTightestCommonType(t1, t2)
   .orElse(findWiderTypeForDecimal(t1, t2))
   .orElse(stringPromotion(t1, t2))
-  .orElse((t1, t2) match {
-case (ArrayType(et1, containsNull1), ArrayType(et2, 
containsNull2)) =>
-  findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
-case _ => None
-  })
--- End diff --

My suggestion: we define a new function, like `findWiderTypeForArray`. This 
new function can provide 'findWider' functionality compared to the 
'findTightestArray' part(which is basically the first commit in your PR). We 
won't break the original `findTightest` semantic in this way and the code is 
clean.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157926754
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +99,17 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (t1 @ ArrayType(pointType1, nullable1), t2 @ 
ArrayType(pointType2, nullable2))
+if t1.sameType(t2) =>
+  val dataType = findTightestCommonType(pointType1, pointType2).get
--- End diff --

Thanks for your illustration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157926722
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -158,11 +169,6 @@ object TypeCoercion {
 findTightestCommonType(t1, t2)
   .orElse(findWiderTypeForDecimal(t1, t2))
   .orElse(stringPromotion(t1, t2))
-  .orElse((t1, t2) match {
-case (ArrayType(et1, containsNull1), ArrayType(et2, 
containsNull2)) =>
-  findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
-case _ => None
-  })
--- End diff --

Thanks for your illustration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19977#discussion_r157819483
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -48,17 +48,26 @@ import org.apache.spark.unsafe.types.{ByteArray, 
UTF8String}
   > SELECT _FUNC_('Spark', 'SQL');
SparkSQL
   """)
-case class Concat(children: Seq[Expression]) extends Expression with 
ImplicitCastInputTypes {
+case class Concat(children: Seq[Expression], isBinaryMode: Boolean = false)
+  extends Expression with ImplicitCastInputTypes {
 
-  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.size)(StringType)
-  override def dataType: DataType = StringType
+  def this(children: Seq[Expression]) = this(children, false)
--- End diff --

This is not needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19977#discussion_r157793004
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,12 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val ConcatBinaryModeEnabled = 
buildConf("spark.sql.expression.concat.binaryMode.enabled")
--- End diff --

Maybe `CONCAT_BINARY_AS_STRING_ENABLED`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19977
  
You mean answers of mysql is unexpected? I think it's common these dbs get 
different behaviors, while Spark mainly follows Hive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157780706
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +99,17 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (t1 @ ArrayType(pointType1, nullable1), t2 @ 
ArrayType(pointType2, nullable2))
+if t1.sameType(t2) =>
+  val dataType = findTightestCommonType(pointType1, pointType2).get
--- End diff --

@mgaido91 Thanks, I got it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157775323
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +99,17 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (t1 @ ArrayType(pointType1, nullable1), t2 @ 
ArrayType(pointType2, nullable2))
+if t1.sameType(t2) =>
+  val dataType = findTightestCommonType(pointType1, pointType2).get
--- End diff --

Let's use this example, if we call `findTightestCommonType` on 2 array of 
array of Integer types, then this `dataType` = 
`findTightest(findTightest(IntegralType, IntegralType))` = `IntegralType`, so 
the final answer is `Some(ArrayType(IntegralType))`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157697044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +99,17 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (t1 @ ArrayType(pointType1, nullable1), t2 @ 
ArrayType(pointType2, nullable2))
+if t1.sameType(t2) =>
+  val dataType = findTightestCommonType(pointType1, pointType2).get
+  Some(ArrayType(dataType, nullable1 || nullable2))
+
+case (t1 @ MapType(keyType1, valueType1, nullable1),
+t2 @ MapType(keyType2, valueType2, nullable2)) if t1.sameType(t2) 
=>
+  val keyType = findTightestCommonType(keyType1, keyType2).get
+  val valueType = findTightestCommonType(valueType1, valueType2).get
--- End diff --

Also here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157696626
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +99,17 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (t1 @ ArrayType(pointType1, nullable1), t2 @ 
ArrayType(pointType2, nullable2))
+if t1.sameType(t2) =>
+  val dataType = findTightestCommonType(pointType1, pointType2).get
--- End diff --

If `t1` and `t2` are the sameType, why do we need recursively 
`findTightestCommonType`? Can we just make `val dataType = pointType1`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20010#discussion_r157695599
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -99,6 +99,17 @@ object TypeCoercion {
 case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) 
=>
   Some(TimestampType)
 
+case (t1 @ ArrayType(pointType1, nullable1), t2 @ 
ArrayType(pointType2, nullable2))
+if t1.sameType(t2) =>
--- End diff --

This will make sure the later `get` won't be applied to `None` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20015: [SPARK-22829] Add new built-in function date_trun...

2017-12-19 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20015#discussion_r157686437
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -1295,87 +1295,184 @@ case class ParseToTimestamp(left: Expression, 
format: Option[Expression], child:
   override def dataType: DataType = TimestampType
 }
 
-/**
- * Returns date truncated to the unit specified by the format.
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "_FUNC_(date, fmt) - Returns `date` with the time portion of the 
day truncated to the unit specified by the format model `fmt`.",
-  examples = """
-Examples:
-  > SELECT _FUNC_('2009-02-12', 'MM');
-   2009-02-01
-  > SELECT _FUNC_('2015-10-27', 'YEAR');
-   2015-01-01
-  """,
-  since = "1.5.0")
-// scalastyle:on line.size.limit
-case class TruncDate(date: Expression, format: Expression)
-  extends BinaryExpression with ImplicitCastInputTypes {
-  override def left: Expression = date
-  override def right: Expression = format
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, 
StringType)
-  override def dataType: DataType = DateType
+trait TruncTime extends BinaryExpression with ImplicitCastInputTypes {
+  val time: Expression
+  val format: Expression
   override def nullable: Boolean = true
-  override def prettyName: String = "trunc"
 
   private lazy val truncLevel: Int =
 DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 
-  override def eval(input: InternalRow): Any = {
+  /**
+   *
+   * @param input
+   * @param maxLevel Maximum level that can be used for truncation (e.g 
MONTH for Date input)
+   * @param truncFunc
+   * @tparam T
+   * @return
+   */
+  protected def evalHelper[T](input: InternalRow, maxLevel: Int)(
+truncFunc: (Any, Int) => T): Any = {
--- End diff --

Maybe `truncFunc: (Any, Int) => Any` is enough? So we don't need to use the 
`T`, but I'm not sure if this is better... 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20015: [SPARK-22829] Add new built-in function date_trun...

2017-12-18 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20015#discussion_r157676669
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -1295,87 +1295,184 @@ case class ParseToTimestamp(left: Expression, 
format: Option[Expression], child:
   override def dataType: DataType = TimestampType
 }
 
-/**
- * Returns date truncated to the unit specified by the format.
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "_FUNC_(date, fmt) - Returns `date` with the time portion of the 
day truncated to the unit specified by the format model `fmt`.",
-  examples = """
-Examples:
-  > SELECT _FUNC_('2009-02-12', 'MM');
-   2009-02-01
-  > SELECT _FUNC_('2015-10-27', 'YEAR');
-   2015-01-01
-  """,
-  since = "1.5.0")
-// scalastyle:on line.size.limit
-case class TruncDate(date: Expression, format: Expression)
-  extends BinaryExpression with ImplicitCastInputTypes {
-  override def left: Expression = date
-  override def right: Expression = format
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, 
StringType)
-  override def dataType: DataType = DateType
+trait TruncTime extends BinaryExpression with ImplicitCastInputTypes {
+  val time: Expression
+  val format: Expression
   override def nullable: Boolean = true
-  override def prettyName: String = "trunc"
 
   private lazy val truncLevel: Int =
 DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 
-  override def eval(input: InternalRow): Any = {
+  /**
+   *
+   * @param input
+   * @param maxLevel Maximum level that can be used for truncation (e.g 
MONTH for Date input)
+   * @param truncFunc
+   * @tparam T
+   * @return
+   */
+  protected def evalHelper[T](input: InternalRow, maxLevel: Int)(
+truncFunc: (Any, Int) => T): Any = {
 val level = if (format.foldable) {
   truncLevel
 } else {
   DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 }
-if (level == -1) {
+if (level == DateTimeUtils.TRUNC_INVALID || level > maxLevel) {
--- End diff --

`// unknown format or too small level`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20015: [SPARK-22829] Add new built-in function date_trun...

2017-12-18 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20015#discussion_r157678588
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -1295,87 +1295,184 @@ case class ParseToTimestamp(left: Expression, 
format: Option[Expression], child:
   override def dataType: DataType = TimestampType
 }
 
-/**
- * Returns date truncated to the unit specified by the format.
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "_FUNC_(date, fmt) - Returns `date` with the time portion of the 
day truncated to the unit specified by the format model `fmt`.",
-  examples = """
-Examples:
-  > SELECT _FUNC_('2009-02-12', 'MM');
-   2009-02-01
-  > SELECT _FUNC_('2015-10-27', 'YEAR');
-   2015-01-01
-  """,
-  since = "1.5.0")
-// scalastyle:on line.size.limit
-case class TruncDate(date: Expression, format: Expression)
-  extends BinaryExpression with ImplicitCastInputTypes {
-  override def left: Expression = date
-  override def right: Expression = format
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, 
StringType)
-  override def dataType: DataType = DateType
+trait TruncTime extends BinaryExpression with ImplicitCastInputTypes {
+  val time: Expression
+  val format: Expression
   override def nullable: Boolean = true
-  override def prettyName: String = "trunc"
 
   private lazy val truncLevel: Int =
 DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 
-  override def eval(input: InternalRow): Any = {
+  /**
+   *
+   * @param input
+   * @param maxLevel Maximum level that can be used for truncation (e.g 
MONTH for Date input)
+   * @param truncFunc
+   * @tparam T
+   * @return
+   */
+  protected def evalHelper[T](input: InternalRow, maxLevel: Int)(
+truncFunc: (Any, Int) => T): Any = {
 val level = if (format.foldable) {
   truncLevel
 } else {
   DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 }
-if (level == -1) {
+if (level == DateTimeUtils.TRUNC_INVALID || level > maxLevel) {
   // unknown format
   null
 } else {
-  val d = date.eval(input)
+  val d = time.eval(input)
   if (d == null) {
 null
   } else {
-DateTimeUtils.truncDate(d.asInstanceOf[Int], level)
+truncFunc(d, level)
   }
 }
   }
 
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+  protected def codeGenHelper[T](
--- End diff --

Why do we need a type parameter `T`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20015: [SPARK-22829] Add new built-in function date_trun...

2017-12-18 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20015#discussion_r157680290
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -944,9 +954,16 @@ object DateTimeUtils {
 date + daysToMonthEnd
   }
 
-  private val TRUNC_TO_YEAR = 1
-  private val TRUNC_TO_MONTH = 2
-  private val TRUNC_INVALID = -1
+  // Visible for testing.
+  val TRUNC_TO_YEAR = 1
+  val TRUNC_TO_MONTH = 2
+  val TRUNC_TO_DAY = 3
+  val TRUNC_TO_HOUR = 4
+  val TRUNC_TO_MINUTE = 5
+  val TRUNC_TO_SECOND = 6
+  val TRUNC_TO_WEEK = 7
+  val TRUNC_TO_QUARTER = 8
+  val TRUNC_INVALID = -1
--- End diff --

Can we bring quarter and week forward, maybe to 3 and 4? Then it's more 
conform to the order of time granularity and max-level design is not influenced.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20015: [SPARK-22829] Add new built-in function date_trun...

2017-12-18 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20015#discussion_r157674840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -1295,87 +1295,184 @@ case class ParseToTimestamp(left: Expression, 
format: Option[Expression], child:
   override def dataType: DataType = TimestampType
 }
 
-/**
- * Returns date truncated to the unit specified by the format.
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "_FUNC_(date, fmt) - Returns `date` with the time portion of the 
day truncated to the unit specified by the format model `fmt`.",
-  examples = """
-Examples:
-  > SELECT _FUNC_('2009-02-12', 'MM');
-   2009-02-01
-  > SELECT _FUNC_('2015-10-27', 'YEAR');
-   2015-01-01
-  """,
-  since = "1.5.0")
-// scalastyle:on line.size.limit
-case class TruncDate(date: Expression, format: Expression)
-  extends BinaryExpression with ImplicitCastInputTypes {
-  override def left: Expression = date
-  override def right: Expression = format
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, 
StringType)
-  override def dataType: DataType = DateType
+trait TruncTime extends BinaryExpression with ImplicitCastInputTypes {
+  val time: Expression
+  val format: Expression
   override def nullable: Boolean = true
-  override def prettyName: String = "trunc"
 
   private lazy val truncLevel: Int =
 DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 
-  override def eval(input: InternalRow): Any = {
+  /**
+   *
+   * @param input
+   * @param maxLevel Maximum level that can be used for truncation (e.g 
MONTH for Date input)
+   * @param truncFunc
+   * @tparam T
+   * @return
+   */
+  protected def evalHelper[T](input: InternalRow, maxLevel: Int)(
+truncFunc: (Any, Int) => T): Any = {
 val level = if (format.foldable) {
   truncLevel
 } else {
   DateTimeUtils.parseTruncLevel(format.eval().asInstanceOf[UTF8String])
 }
-if (level == -1) {
+if (level == DateTimeUtils.TRUNC_INVALID || level > maxLevel) {
   // unknown format
   null
 } else {
-  val d = date.eval(input)
+  val d = time.eval(input)
--- End diff --

nit: Since this is a time, it can be `val t = ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle read less ...

2017-12-13 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19862
  
cc @cloud-fan @hvanhovell  @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle read less ...

2017-12-13 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19862
  
This is actually a small change, but it can provide not small optimization 
for users who don't use `WholeStageCodegen`, for example there're still some 
users who use under 2.0 version of Spark.

Also, the behavior of codegen and non-codegen code paths are supposed to be 
the same, so in this way it is a 'bug'.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-12 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r156581645
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
+  while (inputIterator.hasNext()) {
+insertRow(inputIterator.next());
+  }
+  alreadyCalculated = true;
+}
 sortedIterator.loadNext();
--- End diff --

Yes, you are right. Now I modified the `sortedIterator` after inserting 
rows. Due to I can only access an outer final field inside an inner class, so I 
used an array, is there better solution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-04 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154635850
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Good advice. Thx.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [WIP][SPARK-22671][SQL] Make SortMergeJoin shuffl...

2017-12-04 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154581844
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
--- End diff --

Yes. There are mistakes here, I will change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154563897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

This function should be called (to try to set `BufferedRow`) before 
`BufferedRow` is checked, and it should be only once. This is the original 
requirement due to the logic. While to add this optimization, I think this is 
the best way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154564327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

This advance function is only called once actually, so no `bufferedRow` 
will be missed. Or maybe I didn't understand your meaning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154564488
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
+  if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
--- End diff --

I agree with you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19862: [SPARK-22671][SQL] Make SortMergeJoin read less data whe...

2017-12-01 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19862
  
cc @cloud-fan @viirya @ConeyLiu 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: Make SortMergeJoin read less data when wholeStage...

2017-12-01 Thread gczsjdy
GitHub user gczsjdy opened a pull request:

https://github.com/apache/spark/pull/19862

Make SortMergeJoin read less data when wholeStageCodegen is off

## What changes were proposed in this pull request?

In SortMergeJoin(with wholeStageCodegen), an optimization already exists: 
if the left table of a partition is empty then there is no need to read the 
right table of this corresponding partition. This benefits the case in which 
many partitions of left table is empty and the right table is big.

While in the code path without wholeStageCodegen, this optimization doesn't 
happen. This is mainly due to the lack of optimization in codegen-SortMergeJoin 
& the lack of support in `SortExec`, which doesn't do lazy evaluation. UI when 
wholeStageCodegen is off:
https://user-images.githubusercontent.com/7685352/33493586-8311ac58-d6fb-11e7-816c-7a0fb2065345.PNG;>

When the switch is on: 

![image](https://user-images.githubusercontent.com/7685352/33493675-c821b81a-d6fb-11e7-8cf8-2e5baff75be3.png)

This PR lazy evaluates sort, and only trigger the right table read after 
reading the left table and find it's not empty.

After this PR, with wholeStageCodegen off:

![image](https://user-images.githubusercontent.com/7685352/33493784-2e1ee89a-d6fc-11e7-8201-71273de0b857.png)

## How was this patch tested?

Existed test suites.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gczsjdy/spark smj_less_read

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19862.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19862


commit a8511e8b7e0240c471f88e703545301425960901
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-28T09:01:34Z

Init solution

commit a5c14b473636e571c6d4f17798f220be080a27a1
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-28T09:31:07Z

Style

commit a26ca57d56b0b2df81daa82da49f4ff564fc10f5
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-28T09:37:54Z

Comments

commit 6d875f84de95d67f84ef9774cb6b6ee8273d46a6
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-12-01T11:32:06Z

lazy evaluation in SortExec




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19823: [SPARK-22601][SQL] Data load is getting displayed...

2017-11-27 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19823#discussion_r153131202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -341,6 +341,12 @@ case class LoadDataCommand(
   } else {
 val uri = new URI(path)
 if (uri.getScheme() != null && uri.getAuthority() != null) {
+  val hadoopConf = sparkSession.sessionState.newHadoopConf()
+  val srcPath = new Path(path)
--- End diff --

But will this lose the normalization in `new Path(path)` ? Or normalization 
in `URI` covers it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19823: [SPARK-22601][SQL] Data load is getting displayed...

2017-11-27 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19823#discussion_r153127637
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2624,7 +2624,13 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 val e = intercept[AnalysisException](sql("SELECT nvl(1, 2, 3)"))
 assert(e.message.contains("Invalid number of arguments"))
   }
-
+  test("load command invalid path validation ") {
--- End diff --

nit: also blank lines


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19788
  
Can we just add the `ContinuousShuffleBlockId` without adding new conf 
`spark.shuffle.continuousFetch`? While in classes related to shuffle read like 
`ShuffleBlockFetcherIterator`, we also pattern match the formal 
`ShuffleBlockId`. This way no addition confs are needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19764
  
@caneGuy Can you give a specific example to illustrate your change? Maybe 
former partition result & later partition result?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19788
  
What are ` external shuffle service` here? Can you explain a little bit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153117088
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -116,8 +117,8 @@ object BlockId {
   def apply(name: String): BlockId = name match {
 case RDD(rddId, splitIndex) =>
   RDDBlockId(rddId.toInt, splitIndex.toInt)
-case SHUFFLE(shuffleId, mapId, reduceId) =>
-  ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
+case SHUFFLE(shuffleId, mapId, reduceId, n) =>
--- End diff --

:nit length?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152921091
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
--- End diff --

I think that code will make people confused, and we need more comments to 
explain, that seems unworthy. 
In most cases the default value is enough, so we just add some assertion 
and docs explanation will be good?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-24 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152920483
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Yeah, I will add some.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152912084
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Do you think it's necessary to indicate the actual parallelism's 
calculation way here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911325
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

`statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1` >= 
2 -> `statuses.length.toLong * totalSizes.length >= parallelAggThreshold`, so 
it doesn't need to be 2 times, just not smaller than 1x is good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152907079
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Sorry, but didn't get you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152906960
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length.toLong * totalSizes.length / parallelAggThreshold 
+ 1).toInt
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, 
"map-output-aggregate")
--- End diff --

I think we don't need to fully utilize all available processors. 
`parallelAggThreshold` is default to be 10^7, which means a relatively small 
task to deal with. Therefore the tasks don't need to be cut smaller in most 
cases. 
For some cases where the split is a big task, `parallelAggThreshold` should 
be tuned. This is not very direct because you don't have a `xx.parallelism` 
config to set, but the benefit is we introduced less configs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152888380
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@cloud-fan `We can shut down the pool after some certain idle time, but not 
sure if it's worth the complexity` I know we don't need to do this now. But if 
we did it how to do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152888257
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

My fault!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19763
  
@cloud-fan  Seems Jenkins's  not started?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152827467
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@zsxwing Actually I built using sbt/mvn, no errors...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-22 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152493779
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

I agree with you, with putting the thread pool in the class, the only lost 
is that: even if when single-thread is used, this pool still exists. The gain 
is reducing creating the pool after every shuffle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152193203
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,14 @@ private[spark] object MapOutputTracker extends 
Logging {
 logError(errorMessage)
 throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
   } else {
+var n = 0
+var totalSize = 0L
 for (part <- startPartition until endPartition) {
-  splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) 
+=
-((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
+  n += 1
+  totalSize += status.getSizeForBlock(part)
 }
--- End diff --

`n` can be `numPartitions`, and directly get by `endPartition - 
startPartition` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19755: [SPARK-22524][SQL] Subquery shows reused on UI SQL tab e...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19755
  
I can't find a way to distinguish `reused` and `unreused` subquery. For 
example, in the `ReuseSubquery` rule, after seeing the 1st SubqueryExec(with 
`unreused` in name), it's buffered. When the rule sees the 2nd `SubqueryExec`, 
it will let the 2nd `PlanExpression` holds the same SubqueryExec buffered. This 
is the process of reuse, but it makes us not able to change the physical plan's 
name to indicate it's reused. I thought about add a var state in 
`SubqueryExec`, but it seems few physical plans do that. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152185531
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

Actually there are 3 confs like that... all need?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152021708
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
--- End diff --

Sure : )


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152017736
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

There is also a `spark.shuffle.mapOutput.dispatcher.numThreads` in this 
file without config entry, do I need to add one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152016240
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  if (statuses.length * totalSizes.length <=
+conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD)) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val parallelism = 
conf.getInt("spark.adaptive.map.statistics.cores", 8)
--- End diff --

I thought only adaptive execution code will call this. But actually it 
seems after all `ShuffleMapTask`s of a stage completed this will be called, 
right?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152008181
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

`spark.adaptive.map.statistics.cores` needs config entry, but I thought 
adaptive.xxx item has been put under `spark.sql.` already, so it might be 
inconsitent. Now I think it's no big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152006310
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

I think that's not a big problem, adaptive execution need both core and sql 
code, so both confs are needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152005905
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

Like 
https://github.com/gczsjdy/spark/blob/11b60af737a04d931356aa74ebf3c6cf4a6b08d6/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L204-L204
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152002860
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
+
ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions exceeds this " +
+"threshold")
+  .intConf
+  .createWithDefault(1)
--- End diff --

Now I also think it's a little bit large... In the case I mentioned, the 5s 
gap is created by 10^8 of this value. Maybe 10^7 is good? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152002262
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val 
SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
--- End diff --

`spark.sql.adaptive.xxx` already exists, will this be a problem? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-20 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151921740
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold")
--- End diff --

Yes, it's better!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

2017-11-16 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19763
  
Actually, the time gap is O(number of mappers * shuffle partitions). In 
this case, number of mappers is not very large, while users are more likely to 
get slowed down when they run on a big data set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

2017-11-16 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19763
  
This happens a lot in our TPC-DS 100TB test. We have a Intel Xeon CPU 
E5-2699 v4 @2.2GHz CPU as master, this will influence the driver's performance. 
And we set `spark.sql.shuffle.partitions` to 10976. Shuffle partitions * number 
of mappers will influence the workload driver does.

Let's take TPC-DS q67 as example:
Without this PR, there's 47:39-(41:16+6.3min) ~ 5s gap between map and 
reduce stages, most of which is used to aggregate map statistics using one 
thread. 
https://user-images.githubusercontent.com/7685352/32893095-49216a4a-cb13-11e7-82fe-ccb552a6a625.PNG;>
With this PR, there's 25:32-(18:58+6.6min) ~ 0s gap:
https://user-images.githubusercontent.com/7685352/32893264-beb31b82-cb13-11e7-954f-a893f6a9966f.PNG;>




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19755: [SPARK-22524][SQL] Subquery shows reused on UI SQL tab e...

2017-11-16 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19755
  
This targets on subquery that's not reused, the reused subquery is 
correctly shown in UI now. @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151339166
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
+
+  equallyDivide(totalSizes.length, taskSlices).foreach {
+reduceIds =>
+  mapStatusSubmitTasks += threadPoolMapStats.submit(
+new Runnable {
+  override def run(): Unit = {
+for (s <- statuses; i <- reduceIds) {
+  totalSizes(i) += s.getSizeForBlock(i)
+}
+  }
+}
+  )
   }
+  mapStatusSubmitTasks.foreach(_.get())
--- End diff --

Should I use the `scala.concurrent.ExecutionContext.Implicits.global` 
ExecutionContext?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r151332369
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -473,16 +477,41 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Try to equally divide Range(0, num) to divisor slices
+   */
+  def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
+assert(divisor > 0, "Divisor should be positive")
+val (each, remain) = (num / divisor, num % divisor)
+val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
+if (each != 0) {
+  smaller.grouped(each) ++ bigger.grouped(each + 1)
+} else {
+  bigger.grouped(each + 1)
+}
+  }
+
+  /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
-}
+  val mapStatusSubmitTasks = ArrayBuffer[Future[_]]()
+  var taskSlices = parallelism
+
+  equallyDivide(totalSizes.length, taskSlices).foreach {
+reduceIds =>
+  mapStatusSubmitTasks += threadPoolMapStats.submit(
+new Runnable {
+  override def run(): Unit = {
+for (s <- statuses; i <- reduceIds) {
+  totalSizes(i) += s.getSizeForBlock(i)
+}
+  }
+}
+  )
   }
+  mapStatusSubmitTasks.foreach(_.get())
--- End diff --

Good idea, thx!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537] Aggregation of map output statistics on dr...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19763
  
cc @cloud-fan @viirya @gatorsmile @chenghao-intel 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537] Aggregation of map output statistic...

2017-11-15 Thread gczsjdy
GitHub user gczsjdy opened a pull request:

https://github.com/apache/spark/pull/19763

[SPARK-22537] Aggregation of map output statistics on driver faces single 
point bottleneck

## What changes were proposed in this pull request?

In adaptive execution, the map output statistics of all mappers will be 
aggregated after previous stage is successfully executed. Driver takes the 
aggregation job while it will get slow when the number of `mapper * shuffle 
partitions` is large, since it only uses single thread to compute. This PR uses 
multi-thread to deal with this single point bottleneck.

## How was this patch tested?

Test cases are in `MapOutputTrackerSuite.scala`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gczsjdy/spark single_point_mapstatistics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19763


commit 5dd04872e983de861a301c22a124dd8923ccc8c6
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-16T02:58:22Z

Use multi-thread to solve single point bottleneck

commit 819774fc7087c51a4b7b03213bfb330331d6f108
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-16T03:01:21Z

Add test case

commit da028258bd172b6d3ff89504097fb6651f5c05c0
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-16T03:24:47Z

Style




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19755: [SPARK-22524][SQL] Subquery shows reused on UI SQL tab e...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19755
  
But it might make users confused, I think what shows on UI is supposed to 
be exactly things that get executed. Maybe accuracy is more important than 
clearness for this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19755: [SPARK-22524][SQL] Subquery shows reused on UI SQL tab e...

2017-11-15 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19755
  
@cloud-fan @viirya @carsonwang @gatorsmile @yucai Could you please help me 
review this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19755: [SPARK-22524] Subquery shows reused on UI SQL tab...

2017-11-15 Thread gczsjdy
GitHub user gczsjdy opened a pull request:

https://github.com/apache/spark/pull/19755

[SPARK-22524] Subquery shows reused on UI SQL tab even if it's not reused

After manually disabling `reuseSubquery` rule, the subqueries won't be 
reused. But on the SQL graph, there is only one subquery node and it looks like 
subquery is reused.

`SparkPlanGraph` compares different `SparkPlanInfo`s by comparing their 
`simpleString`, which requires us to design `simpleString` well. While 
`SubqueryExec`'s `simpleString` depends only on the `exprId`, so we need to add 
extra index to its name.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gczsjdy/spark subquery_ui

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19755


commit 7aa148c948a0610779dccc61e9f5c8e6893c
Author: GuoChenzhao <chenzhao@intel.com>
Date:   2017-11-15T08:00:36Z

Add to subquery name




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11403: [SPARK-13523] [SQL] Reuse exchanges in a query

2017-10-29 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/11403
  
@davies Hi, what do you mean by "Since all the planner only work with tree, 
so this rule should be the last one for the entire planning."?
Thanks if you have time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17359: [SPARK-20028][SQL] Add aggreagate expression nGra...

2017-10-10 Thread gczsjdy
Github user gczsjdy closed the pull request at:

https://github.com/apache/spark/pull/17359


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17359: [SPARK-20028][SQL] Add aggreagate expression nGrams

2017-10-10 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/17359
  
Sorry, but I think this is inactive. Thanks for your attention. @wzhfy 
@viirya @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >