[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/15544
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15544
  
**[Test build #81881 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81881/testReport)**
 for PR 15544 at commit 
[`0e9a8dd`](https://github.com/apache/spark/commit/0e9a8dd4004c3331271105e4e9a3df963c3bc84b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19234: [WIP][SPARK-22010][PySpark] Change fromInternal method o...

2017-09-18 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19234
  
Seems fine to me too as is. @maver1ck, I think you could take out `[WIP]` 
and let it be merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...

2017-09-18 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19229
  
Btw, I don't see any hint about 
`df.withColumn(..).withColumn(..).withColumn(..)` can prevent the shuffle 
re-using.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id

2017-09-18 Thread jaceklaskowski
Github user jaceklaskowski commented on the issue:

https://github.com/apache/spark/pull/19261
  
@gatorsmile Dunno, but the logical operator does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-18 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17819
  
@WeichenXu123 I'm ok for that but I think adding an interface doesn't break 
binary compatibility?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19265
  
**[Test build #81879 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81879/testReport)**
 for PR 19265 at commit 
[`d395780`](https://github.com/apache/spark/commit/d3957809ad2bf720686fe6068b2cb6d530bf6845).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81879/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19265
  
**[Test build #81880 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81880/testReport)**
 for PR 19265 at commit 
[`d395780`](https://github.com/apache/spark/commit/d3957809ad2bf720686fe6068b2cb6d530bf6845).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19265
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81880/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-09-18 Thread tgravescs
Github user tgravescs commented on the issue:

https://github.com/apache/spark/pull/18805
  
It looks like zstd-jni has now been updated to pull 1.3.1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...

2017-09-18 Thread danielfx90
Github user danielfx90 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19238#discussion_r139441849
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
---
@@ -1103,6 +1103,17 @@ class JDBCSuite extends SparkFunSuite
""".stripMargin)
 
   val df3 = sql("SELECT * FROM test_sessionInitStatement")
-  assert(df3.collect() === Array(Row(21519, 1234)))
-}
+  assert(df3.collect() === Array(Row(21519, 1234))
+)
--- End diff --

@dongjoon-hyun You are right! I misread the parenthesis. I think now is 
correct. Thank you for the observation :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/17819
  
@viirya It is possible I think. A similar example is, `HasRegParam`  trait, 
do not put `setRegParam` in trait but moved into concrete estimator/transformer 
class, should be the same reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18853
  
**[Test build #81876 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81876/testReport)**
 for PR 18853 at commit 
[`844aec7`](https://github.com/apache/spark/commit/844aec7f4022140921d485b89fc240846bd05ac3).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18853
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81876/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18853
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19230
  
**[Test build #81877 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81877/testReport)**
 for PR 19230 at commit 
[`5ea4e89`](https://github.com/apache/spark/commit/5ea4e8985002dd26922376d672af4db052063909).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19230
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81877/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19230
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...

2017-09-18 Thread srowen
GitHub user srowen opened a pull request:

https://github.com/apache/spark/pull/19266

[SPARK-22033][CORE] BufferHolder, other size checks should account for the 
specific VM array size limitations

## What changes were proposed in this pull request?

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which 
is the actual max size on some JVMs, in several places

## How was this patch tested?

Existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srowen/spark SPARK-22033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19266


commit 8dbfa30848b49a9fe4038c327222ddc9fd9a0ec0
Author: Sean Owen 
Date:   2017-09-18T14:59:40Z

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which 
is the actual max size on some JVMs, in several places




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19266
  
**[Test build #81882 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81882/testReport)**
 for PR 19266 at commit 
[`8dbfa30`](https://github.com/apache/spark/commit/8dbfa30848b49a9fe4038c327222ddc9fd9a0ec0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...

2017-09-18 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19266
  
CC @maropu 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19254: [MINOR][CORE] Cleanup dead code and duplication in Mem. ...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19254
  
**[Test build #3925 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3925/testReport)**
 for PR 19254 at commit 
[`6b408a0`](https://github.com/apache/spark/commit/6b408a0314c4cf9fbd371712175c95942aae2a89).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...

2017-09-18 Thread a10y
Github user a10y commented on a diff in the pull request:

https://github.com/apache/spark/pull/18945#discussion_r139450187
  
--- Diff: python/pyspark/sql/dataframe.py ---
@@ -1810,17 +1810,20 @@ def _to_scala_map(sc, jm):
 return sc._jvm.PythonUtils.toScalaMap(jm)
 
 
-def _to_corrected_pandas_type(dt):
+def _to_corrected_pandas_type(field, strict=True):
 """
 When converting Spark SQL records to Pandas DataFrame, the inferred 
data type may be wrong.
 This method gets the corrected data type for Pandas if that type may 
be inferred uncorrectly.
 """
 import numpy as np
+dt = field.dataType
 if type(dt) == ByteType:
 return np.int8
 elif type(dt) == ShortType:
 return np.int16
 elif type(dt) == IntegerType:
+if not strict and field.nullable:
+return np.float32
--- End diff --

Is loss of precision a concern here? Some integers from the original 
dataset will now be rounded to the nearest representable float32 if I'm not 
mistaken.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19229
  
Looks not the reason. maybe issues somewhere else. Let me run test later. 
Thanks!
But there is some small issues in test:
Don't include gen data time:
```
val start = System.nanoTime()
val df2 = genData()
model.transform(df2).count
val end = System.nanoTime()
```
and add cache at the end of genData:
```
def genData() = {
   
   val df = spark.createDataframe...
  df.cache()
  df.count() // force trigger cache
  df
}
```
and we'd better add warm up code before record code running time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...

2017-09-18 Thread kevinyu98
Github user kevinyu98 commented on the issue:

https://github.com/apache/spark/pull/12646
  
can we retest this ? The unknown return code is not related to the code. 
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19135: [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForT...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19135
  
**[Test build #81878 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81878/testReport)**
 for PR 19135 at commit 
[`e1dc7a4`](https://github.com/apache/spark/commit/e1dc7a46afb99fba9b489e6a5359bc347799e876).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19135: [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForT...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19135
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19135: [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForT...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19135
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81878/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18704
  
**[Test build #81883 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81883/testReport)**
 for PR 18704 at commit 
[`bdecaaf`](https://github.com/apache/spark/commit/bdecaaf045bb135239d09919af92f718e5d0b2c5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15544
  
**[Test build #81881 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81881/testReport)**
 for PR 15544 at commit 
[`0e9a8dd`](https://github.com/apache/spark/commit/0e9a8dd4004c3331271105e4e9a3df963c3bc84b).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19210: [SPARK-22030][CORE] GraphiteSink fails to re-connect to ...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19210
  
**[Test build #81875 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81875/testReport)**
 for PR 19210 at commit 
[`0458123`](https://github.com/apache/spark/commit/0458123c1b3827f8b4b55eeb8bd5f7dbc749a4aa).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15544
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81881/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15544
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19210: [SPARK-22030][CORE] GraphiteSink fails to re-connect to ...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19210
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81875/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139458303
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
   /** When `droppedEventsCounter` was logged last time in milliseconds. */
   @volatile private var lastReportTimestamp = 0L
 
-  // Indicate if we are processing some event
-  // Guarded by `self`
-  private var processingEvent = false
-
-  private val logDroppedEvent = new AtomicBoolean(false)
-
-  // A counter that represents the number of events produced and consumed 
in the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread(name) {
-setDaemon(true)
-override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
-  LiveListenerBus.withinListenerThread.withValue(true) {
-val timer = metrics.eventProcessingTime
-while (true) {
-  eventLock.acquire()
-  self.synchronized {
-processingEvent = true
-  }
-  try {
-val event = eventQueue.poll
-if (event == null) {
-  // Get out of the while loop and shutdown the daemon thread
-  if (!stopped.get) {
-throw new IllegalStateException("Polling `null` from 
eventQueue means" +
-  " the listener bus has been stopped. So `stopped` must 
be true")
-  }
-  return
-}
-val timerContext = timer.time()
-try {
-  postToAll(event)
-} finally {
-  timerContext.stop()
-}
-  } finally {
-self.synchronized {
-  processingEvent = false
-}
-  }
+  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+  /** Add a listener to queue shared by all non-internal listeners. */
+  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, SHARED_QUEUE)
+  }
+
+  /** Add a listener to the executor management queue. */
+  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, EXECUTOR_MGMT_QUEUE)
+  }
+
+  /** Add a listener to the application status queue. */
+  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, APP_STATUS_QUEUE)
+  }
+
+  /** Add a listener to the event log queue. */
+  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, EVENT_LOG_QUEUE)
+  }
+
+  /**
+   * Add a listener to a specific queue, creating a new queue if needed. 
Queues are independent
+   * of each other (each one uses a separate thread for delivering 
events), allowing slower
+   * listeners to be somewhat isolated from others.
+   */
+  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
+if (stopped.get()) {
+  throw new IllegalStateException("LiveListenerBus is stopped.")
--- End diff --

No. It's ok and actually expected to add listeners before the bus is 
started.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19234: [SPARK-22010][PySpark] Change fromInternal method of Tim...

2017-09-18 Thread maver1ck
Github user maver1ck commented on the issue:

https://github.com/apache/spark/pull/19234
  
OK. It passed all tests, so let merge it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...

2017-09-18 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19266
  
I though, if this limit highly depends on JVM implementations, better to 
put the limit as a global variable somewhere (e.g., `ARRAY_INT_MAX` in 
`spark.util.Utils` or other places)? As another option, how about adding 
internal option in `SparkConf` for that?

Also, how about making a parent jira tcket to track the similar issue cuz 
it seems difficult to cover all the possible place in this pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139458812
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
   /** When `droppedEventsCounter` was logged last time in milliseconds. */
   @volatile private var lastReportTimestamp = 0L
 
-  // Indicate if we are processing some event
-  // Guarded by `self`
-  private var processingEvent = false
-
-  private val logDroppedEvent = new AtomicBoolean(false)
-
-  // A counter that represents the number of events produced and consumed 
in the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread(name) {
-setDaemon(true)
-override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
-  LiveListenerBus.withinListenerThread.withValue(true) {
-val timer = metrics.eventProcessingTime
-while (true) {
-  eventLock.acquire()
-  self.synchronized {
-processingEvent = true
-  }
-  try {
-val event = eventQueue.poll
-if (event == null) {
-  // Get out of the while loop and shutdown the daemon thread
-  if (!stopped.get) {
-throw new IllegalStateException("Polling `null` from 
eventQueue means" +
-  " the listener bus has been stopped. So `stopped` must 
be true")
-  }
-  return
-}
-val timerContext = timer.time()
-try {
-  postToAll(event)
-} finally {
-  timerContext.stop()
-}
-  } finally {
-self.synchronized {
-  processingEvent = false
-}
-  }
+  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+  /** Add a listener to queue shared by all non-internal listeners. */
+  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, SHARED_QUEUE)
+  }
+
+  /** Add a listener to the executor management queue. */
+  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, EXECUTOR_MGMT_QUEUE)
+  }
+
+  /** Add a listener to the application status queue. */
+  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, APP_STATUS_QUEUE)
+  }
+
+  /** Add a listener to the event log queue. */
+  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, EVENT_LOG_QUEUE)
+  }
+
+  /**
+   * Add a listener to a specific queue, creating a new queue if needed. 
Queues are independent
+   * of each other (each one uses a separate thread for delivering 
events), allowing slower
+   * listeners to be somewhat isolated from others.
+   */
+  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
+if (stopped.get()) {
+  throw new IllegalStateException("LiveListenerBus is stopped.")
+}
+
+queues.asScala.find(_.name == queue) match {
+  case Some(queue) =>
+queue.addListener(listener)
+
+  case None =>
+val newQueue = new AsyncEventQueue(queue, conf, metrics)
+newQueue.addListener(listener)
+if (started.get() && !stopped.get()) {
+  newQueue.start(sparkContext)
 }
-  }
+queues.add(newQueue)
 }
   }
 
-  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
-
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+  def removeListener(listener: SparkListenerInterface): Unit = 
synchronized {
+// Remove listener from all queues it was added to, and stop queues 
that have become empty.
+queues.asScala
+  .filter { queue =>
+queue.removeListener(listener)
+queue.listeners.isEmpty()
+  }
+  .foreach { toRemove =>
+if (started.get() && !stopped.get()) {
+  toRemove.stop()
--- End diff --

That would waste resources in the normal case.

The default configuration would never add anything to the default and event 
log queues, so you'd have two threads dispatching events to no listeners.

Stopping the queue when it empties, while a rare case, handles when users 
add listeners and later remove them, potentially leaving the default queue 
empty. Singling out the default queue here just complicates the 

[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139458935
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
   /** When `droppedEventsCounter` was logged last time in milliseconds. */
   @volatile private var lastReportTimestamp = 0L
 
-  // Indicate if we are processing some event
-  // Guarded by `self`
-  private var processingEvent = false
-
-  private val logDroppedEvent = new AtomicBoolean(false)
-
-  // A counter that represents the number of events produced and consumed 
in the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread(name) {
-setDaemon(true)
-override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
-  LiveListenerBus.withinListenerThread.withValue(true) {
-val timer = metrics.eventProcessingTime
-while (true) {
-  eventLock.acquire()
-  self.synchronized {
-processingEvent = true
-  }
-  try {
-val event = eventQueue.poll
-if (event == null) {
-  // Get out of the while loop and shutdown the daemon thread
-  if (!stopped.get) {
-throw new IllegalStateException("Polling `null` from 
eventQueue means" +
-  " the listener bus has been stopped. So `stopped` must 
be true")
-  }
-  return
-}
-val timerContext = timer.time()
-try {
-  postToAll(event)
-} finally {
-  timerContext.stop()
-}
-  } finally {
-self.synchronized {
-  processingEvent = false
-}
-  }
+  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+  /** Add a listener to queue shared by all non-internal listeners. */
+  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, SHARED_QUEUE)
+  }
+
+  /** Add a listener to the executor management queue. */
+  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, EXECUTOR_MGMT_QUEUE)
+  }
+
+  /** Add a listener to the application status queue. */
+  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, APP_STATUS_QUEUE)
+  }
+
+  /** Add a listener to the event log queue. */
+  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+addToQueue(listener, EVENT_LOG_QUEUE)
+  }
+
+  /**
+   * Add a listener to a specific queue, creating a new queue if needed. 
Queues are independent
+   * of each other (each one uses a separate thread for delivering 
events), allowing slower
+   * listeners to be somewhat isolated from others.
+   */
+  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
+if (stopped.get()) {
+  throw new IllegalStateException("LiveListenerBus is stopped.")
+}
+
+queues.asScala.find(_.name == queue) match {
+  case Some(queue) =>
+queue.addListener(listener)
+
+  case None =>
+val newQueue = new AsyncEventQueue(queue, conf, metrics)
+newQueue.addListener(listener)
+if (started.get() && !stopped.get()) {
+  newQueue.start(sparkContext)
 }
-  }
+queues.add(newQueue)
 }
   }
 
-  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
-
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+  def removeListener(listener: SparkListenerInterface): Unit = 
synchronized {
+// Remove listener from all queues it was added to, and stop queues 
that have become empty.
+queues.asScala
+  .filter { queue =>
+queue.removeListener(listener)
+queue.listeners.isEmpty()
+  }
+  .foreach { toRemove =>
+if (started.get() && !stopped.get()) {
+  toRemove.stop()
+}
+queues.remove(toRemove)
+  }
+  }
+
+  /** Post an event to all queues. */
+  def post(event: SparkListenerEvent): Unit = {
+if (!stopped.get()) {
--- End diff --

No, as the class javadoc says, it's ok to send events before the bus is 
started.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-09-18 Thread akopich
Github user akopich commented on the issue:

https://github.com/apache/spark/pull/18924
  
Ping @jkbradley .

Thank you @WeichenXu123 one again for the comment! Please, have a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19211
  
**[Test build #81884 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81884/testReport)**
 for PR 19211 at commit 
[`77dd8ec`](https://github.com/apache/spark/commit/77dd8ecb549b8f5d7b7b20ac7384b8f56f1df67e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18924
  
**[Test build #81885 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81885/testReport)**
 for PR 18924 at commit 
[`9ce9655`](https://github.com/apache/spark/commit/9ce9655d2275454e016dd3f3b640e578f4b6e0e4).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/12646
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/12646
  
**[Test build #81886 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81886/testReport)**
 for PR 12646 at commit 
[`79846bf`](https://github.com/apache/spark/commit/79846bfd86c6265ebe7d14906853bfe2ec0467f3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-18 Thread brkyvz
Github user brkyvz commented on the issue:

https://github.com/apache/spark/pull/19196
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19230
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81887 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81887/testReport)**
 for PR 19196 at commit 
[`be39125`](https://github.com/apache/spark/commit/be3912552d200fced24f69c436b59a2c24639380).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19230: [SPARK-22003][SQL] support array column in vector...

2017-09-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19230


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r139464231
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -925,6 +925,12 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE =
+buildConf("spark.sql.binary.comparison.compatible.with.hive")
+  .doc("Whether compatible with Hive when binary comparison.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

This has to be `false`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r139464467
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -925,6 +925,12 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE =
+buildConf("spark.sql.binary.comparison.compatible.with.hive")
--- End diff --

-> `spark.sql.autoTypeCastingCompatibility`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r139464749
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -925,6 +925,12 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE =
+buildConf("spark.sql.binary.comparison.compatible.with.hive")
+  .doc("Whether compatible with Hive when binary comparison.")
--- End diff --

Binary comparison is just one of the implicit type casting cases. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r139465565
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -352,11 +374,16 @@ object TypeCoercion {
 p.makeCopy(Array(Cast(left, TimestampType), right))
   case p @ Equality(left @ TimestampType(), right @ StringType()) =>
 p.makeCopy(Array(left, Cast(right, TimestampType)))
-
   case p @ BinaryComparison(left, right)
-if findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).isDefined =>
+if !plan.conf.binaryComparisonCompatibleWithHive &&
+  findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).isDefined =>
 val commonType = findCommonTypeForBinaryComparison(left.dataType, 
right.dataType).get
 p.makeCopy(Array(castExpr(left, commonType), castExpr(right, 
commonType)))
+  case p @ BinaryComparison(left, right)
+if plan.conf.binaryComparisonCompatibleWithHive &&
--- End diff --

This is hard to maintain and debug. Instead of mixing them together, could 
you separate it from the current rule Spark uses? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19229
  
@viirya I run the code, you're right, most of time cost on the executedPlan 
generation (The old version code). thanks!
But can you append benchmark comparison with `RDD.aggregate` version? 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala#L102


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...

2017-09-18 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19266
  
Yeah, agree, it could be some global constant. I don't think it should be 
configurable. Ideally it's determined from the JVM, but don't know a way to do 
that. 

In many cases, assuming Int.MaxValue is the max array size when it's 
Int.MaxValue-8 doesn't matter much. For example, arguably I should leave the ML 
changes alone here, because, in the very rare case that a matrix size is 
somewhere between Int.MaxValue-8 and Int.MaxValue, it will fail anyway, and 
it's not avoidable given the user input. It's also, maybe, more conservative to 
not always assume anything beyond Int.MaxValue-8 is going to fail, and not 
"proactively" fail at this cutoff.

However I think there are a smallish number of identifiable cases where 
Spark can very much avoid the failure (like BufferHolder), and they're the 
instances where an array size keeps doubling. Maybe we can stick to those clear 
cases? especially any one that seems to have triggered the original error?

Those cases are few enough and related enough that I'm sure they're just 
one issue, not several.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19250
  
**[Test build #81888 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81888/testReport)**
 for PR 19250 at commit 
[`950d33a`](https://github.com/apache/spark/commit/950d33a4835e56748963dfe002bfa7145d91469f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r139467580
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -624,7 +639,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
   }
 
   private def createTestConf(): SparkConf = {
-new SparkConf().set("spark.history.fs.logDirectory", 
testDir.getAbsolutePath())
+new SparkConf()
+  .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+  .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
--- End diff --

The `kvstore` module already tests both in-memory and disk stores 
extensively (running the same set of tests on both). There isn't really 
anything that testing both here can add to those tests.

At most I could make `FsHistoryProvider` do round-robin between disk and 
in-memory; but I don't see the point of running these tests twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r139467662
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   .set("spark.history.fs.logDirectory", logDir)
   .set("spark.history.fs.update.interval", "0")
   .set("spark.testing", "true")
+  .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
--- End diff --

See previous reply.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r139468080
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   appId: String,
   attemptId: Option[String],
   prevFileSize: Long)(): Boolean = {
-lookup(appId, attemptId) match {
-  case None =>
-logDebug(s"Application Attempt $appId/$attemptId not found")
-false
-  case Some(latest) =>
-prevFileSize < latest.fileSize
+try {
+  val attempt = getAttempt(appId, attemptId)
+  val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+  recordedFileSize(logPath) > prevFileSize
+} catch {
+  case _: NoSuchElementException => false
+}
+  }
+
+  /**
+   * Return the last known size of the given event log, recorded the last 
time the file
+   * system scanner detected a change in the file.
+   */
+  private def recordedFileSize(log: Path): Long = {
+try {
+  listing.read(classOf[LogInfo], log.toString()).fileSize
+} catch {
+  case _: NoSuchElementException => 0L
 }
   }
+
+  private def load(appId: String): ApplicationInfoWrapper = {
+listing.read(classOf[ApplicationInfoWrapper], appId)
+  }
+
+  /**
+   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
+   * where two threads are processing separate attempts of the same 
application.
+   */
+  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
+val attempt = app.attempts.head
+
+val oldApp = try {
+  listing.read(classOf[ApplicationInfoWrapper], app.id)
+} catch {
+  case _: NoSuchElementException =>
+app
+}
+
+def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
+  a1.info.startTime.getTime() > a2.info.startTime.getTime()
+}
+
+val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
+  List(attempt)
+
+val newAppInfo = new ApplicationInfoWrapper(
+  app.info,
+  attempts.sortWith(compareAttemptInfo))
+listing.write(newAppInfo)
--- End diff --

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r139468045
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   appId: String,
   attemptId: Option[String],
   prevFileSize: Long)(): Boolean = {
-lookup(appId, attemptId) match {
-  case None =>
-logDebug(s"Application Attempt $appId/$attemptId not found")
-false
-  case Some(latest) =>
-prevFileSize < latest.fileSize
+try {
+  val attempt = getAttempt(appId, attemptId)
+  val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+  recordedFileSize(logPath) > prevFileSize
+} catch {
+  case _: NoSuchElementException => false
+}
+  }
+
+  /**
+   * Return the last known size of the given event log, recorded the last 
time the file
+   * system scanner detected a change in the file.
+   */
+  private def recordedFileSize(log: Path): Long = {
+try {
+  listing.read(classOf[LogInfo], log.toString()).fileSize
+} catch {
+  case _: NoSuchElementException => 0L
 }
   }
+
+  private def load(appId: String): ApplicationInfoWrapper = {
+listing.read(classOf[ApplicationInfoWrapper], appId)
+  }
+
+  /**
+   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
+   * where two threads are processing separate attempts of the same 
application.
+   */
+  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
+val attempt = app.attempts.head
--- End diff --

Because the listener processes a single event log; one event log == one 
attempt.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r139468324
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   appId: String,
   attemptId: Option[String],
   prevFileSize: Long)(): Boolean = {
-lookup(appId, attemptId) match {
-  case None =>
-logDebug(s"Application Attempt $appId/$attemptId not found")
-false
-  case Some(latest) =>
-prevFileSize < latest.fileSize
+try {
+  val attempt = getAttempt(appId, attemptId)
+  val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+  recordedFileSize(logPath) > prevFileSize
+} catch {
+  case _: NoSuchElementException => false
+}
+  }
+
+  /**
+   * Return the last known size of the given event log, recorded the last 
time the file
+   * system scanner detected a change in the file.
+   */
+  private def recordedFileSize(log: Path): Long = {
+try {
+  listing.read(classOf[LogInfo], log.toString()).fileSize
+} catch {
+  case _: NoSuchElementException => 0L
 }
   }
+
+  private def load(appId: String): ApplicationInfoWrapper = {
+listing.read(classOf[ApplicationInfoWrapper], appId)
+  }
+
+  /**
+   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
+   * where two threads are processing separate attempts of the same 
application.
+   */
+  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
+val attempt = app.attempts.head
+
+val oldApp = try {
+  listing.read(classOf[ApplicationInfoWrapper], app.id)
+} catch {
+  case _: NoSuchElementException =>
+app
+}
+
+def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
+  a1.info.startTime.getTime() > a2.info.startTime.getTime()
+}
+
+val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
+  List(attempt)
+
+val newAppInfo = new ApplicationInfoWrapper(
+  app.info,
+  attempts.sortWith(compareAttemptInfo))
+listing.write(newAppInfo)
+  }
+
+  /** For testing. Returns internal data about a single attempt. */
+  private[history] def getAttempt(appId: String, attemptId: 
Option[String]): AttemptInfoWrapper = {
+load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
+  throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
+  }
+
 }
 
 private[history] object FsHistoryProvider {
-  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
-
   private val NOT_STARTED = ""
--- End diff --

This existed for compatibility with really old event logs that 2.x does not 
support anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r139468509
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +703,150 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /**
+   * Current version of the data written to the listing database. When 
opening an existing
+   * db, if the version does not match this value, the FsHistoryProvider 
will throw away
+   * all data and re-generate the listing data from the event logs.
+   */
+  private val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
- *the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was 
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+  version: Long,
+  logDir: String)
+
+private[history] case class LogInfo(
+  @KVIndexParam logPath: String,
+  fileSize: Long)
+
+private[history] class AttemptInfoWrapper(
+val info: v1.ApplicationAttemptInfo,
 val logPath: String,
-val name: String,
-val appId: String,
-attemptId: Option[String],
-startTime: Long,
-endTime: Long,
-lastUpdated: Long,
-sparkUser: String,
-completed: Boolean,
-val fileSize: Long,
-appSparkVersion: String)
-  extends ApplicationAttemptInfo(
-  attemptId, startTime, endTime, lastUpdated, sparkUser, completed, 
appSparkVersion) {
-
-  /** extend the superclass string value with the extra attributes of this 
class */
-  override def toString: String = {
-s"FsApplicationAttemptInfo($name, $appId," +
-  s" ${super.toString}, source=$logPath, size=$fileSize"
+val fileSize: Long) {
+
+  def toAppAttemptInfo(): ApplicationAttemptInfo = {
+ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
+  info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
+  info.completed, info.appSparkVersion)
   }
+
 }
 
-/**
- * Application history information
- * @param id application ID
- * @param name application name
- * @param attempts list of attempts, most recent first.
- */
-private class FsApplicationHistoryInfo(
-id: String,
-override val name: String,
-override val attempts: List[FsApplicationAttemptInfo])
-  extends ApplicationHistoryInfo(id, name, attempts)
+private[history] class ApplicationInfoWrapper(
+val info: v1.ApplicationInfo,
+val attempts: List[AttemptInfoWrapper]) {
+
+  @JsonIgnore @KVIndexParam
+  def id: String = info.id
+
+  @JsonIgnore @KVIndexParam("endTime")
+  def endTime(): Long = attempts.head.info.endTime.getTime()
+
+  @JsonIgnore @KVIndexParam("oldestAttempt")
+  def oldestAttempt(): Long = 
attempts.map(_.info.lastUpdated.getTime()).min
+
+  def toAppHistoryInfo(): ApplicationHistoryInfo = {
+ApplicationHistoryInfo(info.id, info.name, 
attempts.map(_.toAppAttemptInfo()))
+  }
+
+  def toApiInfo(): v1.ApplicationInfo = {
+new v1.ApplicationInfo(info.id, info.name, info.coresGranted, 
info.maxCores,
+  info.coresPerExecutor, info.memoryPerExecutorMB, 
attempts.map(_.info))
+  }
+
+}
+
+private[history] class AppListingListener(log: FileStatus, clock: Clock) 
extends SparkListener {
+
+  private val app = new MutableApplicationInfo()
+  private val attempt = new MutableAttemptInfo(log.getPath().getName(), 
log.getLen())
+
+  override def 

[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18924
  
**[Test build #81885 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81885/testReport)**
 for PR 18924 at commit 
[`9ce9655`](https://github.com/apache/spark/commit/9ce9655d2275454e016dd3f3b640e578f4b6e0e4).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18924
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18924
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81885/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #81889 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81889/testReport)**
 for PR 19222 at commit 
[`7c2c0cb`](https://github.com/apache/spark/commit/7c2c0cb832101802a6c753d1f07d541284e602a7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139470472
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val logphatPartOptionBase = if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]])] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase
--- End diff --

We can not reference outer variable and modify it in `map` function, it 
will generate undefined result.
You need create the `logphatPartOption` object in the `map` function, like:
```
val localOptimizeDocConcentration = optimizeDocConcentration
batch.mapPartitions { docs =>
  ...
  val logphatPartOption = if (localOptimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
  ...
```
And note that avoid directly use `optimizeDocConcentration` in `rdd.map` 
function because the var  is class member and will cause the whole class object 
to serialize.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139467949
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val logphatPartOptionBase = if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]])] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption))
+}
+
+val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]),
+ v : (BDM[Double], Option[BDV[Double]])) 
=> {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  u
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase))(
+elementWiseSumInPlace, elementWiseSumInPlace
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, batchSize))
+
+expElogbetaBc.destroy(false)
+stats.unpersist()
--- End diff --

This line is useless.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC connection t...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19238
  
I can see the value, but it does not perform well in most cases if we using 
JDBC connection. Instead of adding the extra dialect to upstream, could you 
please add Hive as a separate data source? Thanks!

https://spark.apache.org/third-party-projects.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19256: [SPARK-21338][SQL]implement isCascadingTruncateTable() m...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19256
  
It looks good, but the actual code should be very simple if you are writing 
using the Scala way


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id

2017-09-18 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/19261
  
What does this even mean?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19267: [WIP][SPARK-20628][CORE] Blacklist nodes when the...

2017-09-18 Thread juanrh
GitHub user juanrh opened a pull request:

https://github.com/apache/spark/pull/19267

[WIP][SPARK-20628][CORE] Blacklist nodes when they transition to 
DECOMMISSIONING state in YARN

## What changes were proposed in this pull request? 
Dynamic cluster configurations where cluster nodes are added and removed 
frequently are common in public cloud environments, so this has become a 
[problem for Spark 
users](https://www.trulia.com/blog/tech/aws-emr-ad-hoc-spark-development-environme...
 ). To cope with this we propose implementing a mechanism in the line of 
YARN’s support for [graceful node 
decommission](https://issues.apache.org/jira/browse/YARN-914 ) or [Mesos 
maintenance 
primitives](https://mesos.apache.org/documentation/latest/maintenance/ ). These 
changes allow cluster nodes to be transitioned to a ‘decommissioning’ 
state, at which point no more tasks will be scheduled on the executors running 
on those nodes. After a configurable drain time, nodes in the 
‘decommissioning’ state will transition to a ‘decommissioned’ state, 
where shuffle blocks are not available anymore. Shuffle blocks stored on nodes 
in the ‘decommissioning’ state are available to other executors. By 
preventing more tasks from 
 running on nodes in the ‘decommissioning’ state we avoid creating more 
shuffle blocks on those nodes, as those blocks won’t be available when nodes 
eventually transition to the ‘decommissioned’ state. 

We have implemented a first version of this proposal for YARN, using 
Spark’s [blacklisting mechanism for task 
scheduling](https://issues.apache.org/jira/browse/SPARK-8425 ) ─available at 
the node level since Spark 2.2.0─ to ensure tasks are not scheduled on nodes 
in the ‘decommissioning’ state. With this solution it is the cluster 
manager, not the Spark application, that tracks the status of the node, and 
handles the transition from ‘decommissioning’ to ‘decommissioned’. The 
Spark driver simply reacts to the node state transitions. 

## How was this patch tested? 
All functionality has been tested with unit tests, with an integration test 
based on the BaseYarnClusterSuite, and with manual testing on a cluster 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juanrh/spark 
SPARK-20628-yarn-decommissioning-blacklisting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19267.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19267


commit 35285871a40fbf0903784dfb8dc16bd4e37062fe
Author: Juan Rodriguez Hortala 
Date:   2017-08-23T16:58:24Z

Send Host status update signals to YarnSchedulerBackend, on Yarn node state 
changes

commit 42891dafcfe7d25026719b6025c1272e7fe2d947
Author: Juan Rodriguez Hortala 
Date:   2017-08-23T18:49:04Z

Add mechanism to Blacklist/Unblacklist nodes based on Node status changes 
in Cluster Manager

commit 0c840c74a85012a4d91e349ae4830455ef3d680b
Author: Juan Rodriguez Hortala 
Date:   2017-08-23T19:56:31Z

Add configuration to independently enable/disable task execution 
blacklisting and decommissioning blacklisting

commit f9fdfb01ac2486cc268b13568d129f926b3b8ab2
Author: Juan Rodriguez Hortala 
Date:   2017-08-23T20:29:49Z

Integration test for Yarn node decommissioning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19267: [WIP][SPARK-20628][CORE] Blacklist nodes when they trans...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19267
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18887
  
**[Test build #81890 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81890/testReport)**
 for PR 18887 at commit 
[`56b68a0`](https://github.com/apache/spark/commit/56b68a06e5fc41e423a10ed767c1eb50d62395f5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19261
  
I think we should not do it, because no DB vendor does it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19268: Incorrect Metric reported in MetricsReporter.scal...

2017-09-18 Thread Taaffy
GitHub user Taaffy opened a pull request:

https://github.com/apache/spark/pull/19268

Incorrect Metric reported in MetricsReporter.scala

Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond

## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to 
processedRowsPerSecond

## How was this patch tested?

Built spark from source with proposed change and tested output with correct 
parameter. Before change the csv metrics file for inputRate-total and 
processingRate-total displayed the same values due to the error. After changing 
MetricsReporter.scala the processingRate-total csv file displayed the correct 
metric. 
https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png";>

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Taaffy/spark patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19268


commit afe083ff45313ed07cb95ded4c089ead7d80ecce
Author: Taaffy <32072374+taa...@users.noreply.github.com>
Date:   2017-09-18T16:56:51Z

Incorrect Metric reported in MetricsReporter.scala 

Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala

2017-09-18 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19268
  
Please make a JIRA @Taaffy 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala

2017-09-18 Thread Taaffy
Github user Taaffy commented on the issue:

https://github.com/apache/spark/pull/19268
  
Will do. Delete this pull afterwards?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala

2017-09-18 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19268
  
No way to make the change without a PR, so no leave it.
http://spark.apache.org/contributing.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19268
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19232: [SPARK-22009][ML] Using treeAggregate improve some algs

2017-09-18 Thread sethah
Github user sethah commented on the issue:

https://github.com/apache/spark/pull/19232
  
Sure, we all agree there is a mechanism for avoiding overhead. However, 
performance tests are very tricky things, 5% is not a huge improvement, and 
hard-coding the aggregation depth to `2` limits the utility of using 
`treeAggregate`. I think the change is probably fine for just the reason that 
`treeAggregate` shouldn't hurt performance and might speed things up. Still, I 
don't think there's enough information yet to determine under what 
circumstances this actually improves the performance, if any. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19268
  
**[Test build #3926 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3926/testReport)**
 for PR 19268 at commit 
[`afe083f`](https://github.com/apache/spark/commit/afe083ff45313ed07cb95ded4c089ead7d80ecce).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-09-18 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19218
  
- You can get it from the table metadata `table: CatalogTable`

- `Insertintohadoopfsrelationcommand.scala ` is for data source tables. We 
only have the issues for Hive table writing, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-18 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/19269

[SPARK-22026][SQL][WIP] data source v2 write path

## What changes were proposed in this pull request?

A working prototype for data source v2 write path.

TODO: doc.

## How was this patch tested?

new tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark data-source-v2-write

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19269


commit 7b6a6b785bd56f9c7cec8999aa31c0427eb05e55
Author: Wenchen Fan 
Date:   2017-09-18T17:12:55Z

data source v2 write path




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19269
  
**[Test build #81891 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81891/testReport)**
 for PR 19269 at commit 
[`7b6a6b7`](https://github.com/apache/spark/commit/7b6a6b785bd56f9c7cec8999aa31c0427eb05e55).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19269
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19269
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81891/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19269
  
**[Test build #81891 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81891/testReport)**
 for PR 19269 at commit 
[`7b6a6b7`](https://github.com/apache/spark/commit/7b6a6b785bd56f9c7cec8999aa31c0427eb05e55).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, 
query: LogicalPlan)`
  * `class RowToUnsafeRowWriteTask(rowWriteTask: WriteTask[Row], schema: 
StructType)`
  * `class RowToUnsafeRowDataWriter(rowWriter: DataWriter[Row], encoder: 
ExpressionEncoder[Row])`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...

2017-09-18 Thread buryat
Github user buryat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19266#discussion_r139489491
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -39,7 +39,7 @@
   private final long length;
 
   public LongArray(MemoryBlock memory) {
-assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 
billion elements";
+assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 2.1 
billion elements";
--- End diff --

`assert memory.size() < (long) (Integer.MAX_VALUE - 8) * 8`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...

2017-09-18 Thread buryat
Github user buryat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19266#discussion_r139489658
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java ---
@@ -30,11 +30,15 @@
   HashMapGrowthStrategy DOUBLING = new Doubling();
 
   class Doubling implements HashMapGrowthStrategy {
+
+private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
--- End diff --

Maybe worth adding a comment why this values is chosen as the max
Like here

http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l223


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...

2017-09-18 Thread buryat
Github user buryat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19266#discussion_r139492024
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala ---
@@ -126,22 +126,20 @@ private[spark] class CompactBuffer[T: ClassTag] 
extends Seq[T] with Serializable
 
   /** Increase our size to newSize and grow the backing array if needed. */
   private def growToSize(newSize: Int): Unit = {
-if (newSize < 0) {
-  throw new UnsupportedOperationException("Can't grow buffer past 
Int.MaxValue elements")
+val arrayMax = Int.MaxValue - 8
+if (newSize < 0 || newSize - 2 > arrayMax) {
+  throw new UnsupportedOperationException(s"Can't grow buffer past 
$arrayMax elements")
 }
 val capacity = if (otherElements != null) otherElements.length + 2 
else 2
 if (newSize > capacity) {
-  var newArrayLen = 8
+  var newArrayLen = 8L
   while (newSize - 2 > newArrayLen) {
--- End diff --

I think we can remove `- 2` now since `newArrayLen` is a Long


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139493581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonRunner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]],
+ */
+case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], child: SparkPlan)
+  extends SparkPlan {
+
+  def children: Seq[SparkPlan] = child :: Nil
+
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
--- End diff --

Yes, these functions are duplicated, as well as some code in `doExecute()`. 
 I could add a common base class like `EvalPythonExec` to clean this up, and 
maybe move to the same file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139496371
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

If the user function doesn't define the keyword args, then it is wrapped 
with a placeholder so that `worker.py` can expect the function to always have 
keywords.  I thought this was better than trying to do inspection on the worker 
while running the UDF.

I'm not crazy about the 0-parameter pandas_udf, but if we have to support 
it here then it does need to get the required length of output somehow, unless 
we repeat/slice the output to make the length correct.

I'm ok with making `**kwargs` mandatory for 0-parameter UDFs and optional 
for others.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18704
  
**[Test build #81883 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81883/testReport)**
 for PR 18704 at commit 
[`bdecaaf`](https://github.com/apache/spark/commit/bdecaaf045bb135239d09919af92f718e5d0b2c5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18704
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81883/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-09-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18704
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19250
  
**[Test build #81888 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81888/testReport)**
 for PR 19250 at commit 
[`950d33a`](https://github.com/apache/spark/commit/950d33a4835e56748963dfe002bfa7145d91469f).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...

2017-09-18 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19266
  
**[Test build #81882 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81882/testReport)**
 for PR 19266 at commit 
[`8dbfa30`](https://github.com/apache/spark/commit/8dbfa30848b49a9fe4038c327222ddc9fd9a0ec0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   >