[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/15544 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15544 **[Test build #81881 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81881/testReport)** for PR 15544 at commit [`0e9a8dd`](https://github.com/apache/spark/commit/0e9a8dd4004c3331271105e4e9a3df963c3bc84b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19234: [WIP][SPARK-22010][PySpark] Change fromInternal method o...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19234 Seems fine to me too as is. @maver1ck, I think you could take out `[WIP]` and let it be merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19229 Btw, I don't see any hint about `df.withColumn(..).withColumn(..).withColumn(..)` can prevent the shuffle re-using. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/19261 @gatorsmile Dunno, but the logical operator does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/17819 @WeichenXu123 I'm ok for that but I think adding an interface doesn't break binary compatibility? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19265 **[Test build #81879 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81879/testReport)** for PR 19265 at commit [`d395780`](https://github.com/apache/spark/commit/d3957809ad2bf720686fe6068b2cb6d530bf6845). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19265 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19265 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81879/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19265 **[Test build #81880 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81880/testReport)** for PR 19265 at commit [`d395780`](https://github.com/apache/spark/commit/d3957809ad2bf720686fe6068b2cb6d530bf6845). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19265 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19265: [SPARK-22047][flaky test] HiveExternalCatalogVersionsSui...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19265 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81880/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec
Github user tgravescs commented on the issue: https://github.com/apache/spark/pull/18805 It looks like zstd-jni has now been updated to pull 1.3.1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...
Github user danielfx90 commented on a diff in the pull request: https://github.com/apache/spark/pull/19238#discussion_r139441849 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala --- @@ -1103,6 +1103,17 @@ class JDBCSuite extends SparkFunSuite """.stripMargin) val df3 = sql("SELECT * FROM test_sessionInitStatement") - assert(df3.collect() === Array(Row(21519, 1234))) -} + assert(df3.collect() === Array(Row(21519, 1234)) +) --- End diff -- @dongjoon-hyun You are right! I misread the parenthesis. I think now is correct. Thank you for the observation :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/17819 @viirya It is possible I think. A similar example is, `HasRegParam` trait, do not put `setRegParam` in trait but moved into concrete estimator/transformer class, should be the same reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18853 **[Test build #81876 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81876/testReport)** for PR 18853 at commit [`844aec7`](https://github.com/apache/spark/commit/844aec7f4022140921d485b89fc240846bd05ac3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18853 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81876/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18853 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19230 **[Test build #81877 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81877/testReport)** for PR 19230 at commit [`5ea4e89`](https://github.com/apache/spark/commit/5ea4e8985002dd26922376d672af4db052063909). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19230 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81877/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19230 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...
GitHub user srowen opened a pull request: https://github.com/apache/spark/pull/19266 [SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations ## What changes were proposed in this pull request? Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/srowen/spark SPARK-22033 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19266 commit 8dbfa30848b49a9fe4038c327222ddc9fd9a0ec0 Author: Sean Owen Date: 2017-09-18T14:59:40Z Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19266 **[Test build #81882 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81882/testReport)** for PR 19266 at commit [`8dbfa30`](https://github.com/apache/spark/commit/8dbfa30848b49a9fe4038c327222ddc9fd9a0ec0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19266 CC @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19254: [MINOR][CORE] Cleanup dead code and duplication in Mem. ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19254 **[Test build #3925 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3925/testReport)** for PR 19254 at commit [`6b408a0`](https://github.com/apache/spark/commit/6b408a0314c4cf9fbd371712175c95942aae2a89). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user a10y commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r139450187 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1810,17 +1810,20 @@ def _to_scala_map(sc, jm): return sc._jvm.PythonUtils.toScalaMap(jm) -def _to_corrected_pandas_type(dt): +def _to_corrected_pandas_type(field, strict=True): """ When converting Spark SQL records to Pandas DataFrame, the inferred data type may be wrong. This method gets the corrected data type for Pandas if that type may be inferred uncorrectly. """ import numpy as np +dt = field.dataType if type(dt) == ByteType: return np.int8 elif type(dt) == ShortType: return np.int16 elif type(dt) == IntegerType: +if not strict and field.nullable: +return np.float32 --- End diff -- Is loss of precision a concern here? Some integers from the original dataset will now be rounded to the nearest representable float32 if I'm not mistaken. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19229 Looks not the reason. maybe issues somewhere else. Let me run test later. Thanks! But there is some small issues in test: Don't include gen data time: ``` val start = System.nanoTime() val df2 = genData() model.transform(df2).count val end = System.nanoTime() ``` and add cache at the end of genData: ``` def genData() = { val df = spark.createDataframe... df.cache() df.count() // force trigger cache df } ``` and we'd better add warm up code before record code running time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...
Github user kevinyu98 commented on the issue: https://github.com/apache/spark/pull/12646 can we retest this ? The unknown return code is not related to the code. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19135: [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19135 **[Test build #81878 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81878/testReport)** for PR 19135 at commit [`e1dc7a4`](https://github.com/apache/spark/commit/e1dc7a46afb99fba9b489e6a5359bc347799e876). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19135: [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19135 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19135: [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19135 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81878/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18704 **[Test build #81883 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81883/testReport)** for PR 18704 at commit [`bdecaaf`](https://github.com/apache/spark/commit/bdecaaf045bb135239d09919af92f718e5d0b2c5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15544 **[Test build #81881 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81881/testReport)** for PR 15544 at commit [`0e9a8dd`](https://github.com/apache/spark/commit/0e9a8dd4004c3331271105e4e9a3df963c3bc84b). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19210: [SPARK-22030][CORE] GraphiteSink fails to re-connect to ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19210 **[Test build #81875 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81875/testReport)** for PR 19210 at commit [`0458123`](https://github.com/apache/spark/commit/0458123c1b3827f8b4b55eeb8bd5f7dbc749a4aa). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15544 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81881/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15544: [SPARK-17997] [SQL] Add an aggregation function for coun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15544 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19210: [SPARK-22030][CORE] GraphiteSink fails to re-connect to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19210 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81875/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139458303 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { -setDaemon(true) -override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { -val timer = metrics.eventProcessingTime -while (true) { - eventLock.acquire() - self.synchronized { -processingEvent = true - } - try { -val event = eventQueue.poll -if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { -throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return -} -val timerContext = timer.time() -try { - postToAll(event) -} finally { - timerContext.stop() -} - } finally { -self.synchronized { - processingEvent = false -} - } + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + + /** Add a listener to queue shared by all non-internal listeners. */ + def addToSharedQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, SHARED_QUEUE) + } + + /** Add a listener to the executor management queue. */ + def addToManagementQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, EXECUTOR_MGMT_QUEUE) + } + + /** Add a listener to the application status queue. */ + def addToStatusQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, APP_STATUS_QUEUE) + } + + /** Add a listener to the event log queue. */ + def addToEventLogQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, EVENT_LOG_QUEUE) + } + + /** + * Add a listener to a specific queue, creating a new queue if needed. Queues are independent + * of each other (each one uses a separate thread for delivering events), allowing slower + * listeners to be somewhat isolated from others. + */ + private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { +if (stopped.get()) { + throw new IllegalStateException("LiveListenerBus is stopped.") --- End diff -- No. It's ok and actually expected to add listeners before the bus is started. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19234: [SPARK-22010][PySpark] Change fromInternal method of Tim...
Github user maver1ck commented on the issue: https://github.com/apache/spark/pull/19234 OK. It passed all tests, so let merge it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19266 I though, if this limit highly depends on JVM implementations, better to put the limit as a global variable somewhere (e.g., `ARRAY_INT_MAX` in `spark.util.Utils` or other places)? As another option, how about adding internal option in `SparkConf` for that? Also, how about making a parent jira tcket to track the similar issue cuz it seems difficult to cover all the possible place in this pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139458812 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { -setDaemon(true) -override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { -val timer = metrics.eventProcessingTime -while (true) { - eventLock.acquire() - self.synchronized { -processingEvent = true - } - try { -val event = eventQueue.poll -if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { -throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return -} -val timerContext = timer.time() -try { - postToAll(event) -} finally { - timerContext.stop() -} - } finally { -self.synchronized { - processingEvent = false -} - } + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + + /** Add a listener to queue shared by all non-internal listeners. */ + def addToSharedQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, SHARED_QUEUE) + } + + /** Add a listener to the executor management queue. */ + def addToManagementQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, EXECUTOR_MGMT_QUEUE) + } + + /** Add a listener to the application status queue. */ + def addToStatusQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, APP_STATUS_QUEUE) + } + + /** Add a listener to the event log queue. */ + def addToEventLogQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, EVENT_LOG_QUEUE) + } + + /** + * Add a listener to a specific queue, creating a new queue if needed. Queues are independent + * of each other (each one uses a separate thread for delivering events), allowing slower + * listeners to be somewhat isolated from others. + */ + private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { +if (stopped.get()) { + throw new IllegalStateException("LiveListenerBus is stopped.") +} + +queues.asScala.find(_.name == queue) match { + case Some(queue) => +queue.addListener(listener) + + case None => +val newQueue = new AsyncEventQueue(queue, conf, metrics) +newQueue.addListener(listener) +if (started.get() && !stopped.get()) { + newQueue.start(sparkContext) } - } +queues.add(newQueue) } } - override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + def removeListener(listener: SparkListenerInterface): Unit = synchronized { +// Remove listener from all queues it was added to, and stop queues that have become empty. +queues.asScala + .filter { queue => +queue.removeListener(listener) +queue.listeners.isEmpty() + } + .foreach { toRemove => +if (started.get() && !stopped.get()) { + toRemove.stop() --- End diff -- That would waste resources in the normal case. The default configuration would never add anything to the default and event log queues, so you'd have two threads dispatching events to no listeners. Stopping the queue when it empties, while a rare case, handles when users add listeners and later remove them, potentially leaving the default queue empty. Singling out the default queue here just complicates the
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139458935 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { -setDaemon(true) -override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { -val timer = metrics.eventProcessingTime -while (true) { - eventLock.acquire() - self.synchronized { -processingEvent = true - } - try { -val event = eventQueue.poll -if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { -throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return -} -val timerContext = timer.time() -try { - postToAll(event) -} finally { - timerContext.stop() -} - } finally { -self.synchronized { - processingEvent = false -} - } + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + + /** Add a listener to queue shared by all non-internal listeners. */ + def addToSharedQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, SHARED_QUEUE) + } + + /** Add a listener to the executor management queue. */ + def addToManagementQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, EXECUTOR_MGMT_QUEUE) + } + + /** Add a listener to the application status queue. */ + def addToStatusQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, APP_STATUS_QUEUE) + } + + /** Add a listener to the event log queue. */ + def addToEventLogQueue(listener: SparkListenerInterface): Unit = { +addToQueue(listener, EVENT_LOG_QUEUE) + } + + /** + * Add a listener to a specific queue, creating a new queue if needed. Queues are independent + * of each other (each one uses a separate thread for delivering events), allowing slower + * listeners to be somewhat isolated from others. + */ + private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { +if (stopped.get()) { + throw new IllegalStateException("LiveListenerBus is stopped.") +} + +queues.asScala.find(_.name == queue) match { + case Some(queue) => +queue.addListener(listener) + + case None => +val newQueue = new AsyncEventQueue(queue, conf, metrics) +newQueue.addListener(listener) +if (started.get() && !stopped.get()) { + newQueue.start(sparkContext) } - } +queues.add(newQueue) } } - override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + def removeListener(listener: SparkListenerInterface): Unit = synchronized { +// Remove listener from all queues it was added to, and stop queues that have become empty. +queues.asScala + .filter { queue => +queue.removeListener(listener) +queue.listeners.isEmpty() + } + .foreach { toRemove => +if (started.get() && !stopped.get()) { + toRemove.stop() +} +queues.remove(toRemove) + } + } + + /** Post an event to all queues. */ + def post(event: SparkListenerEvent): Unit = { +if (!stopped.get()) { --- End diff -- No, as the class javadoc says, it's ok to send events before the bus is started. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user akopich commented on the issue: https://github.com/apache/spark/pull/18924 Ping @jkbradley . Thank you @WeichenXu123 one again for the comment! Please, have a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19211 **[Test build #81884 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81884/testReport)** for PR 19211 at commit [`77dd8ec`](https://github.com/apache/spark/commit/77dd8ecb549b8f5d7b7b20ac7384b8f56f1df67e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18924 **[Test build #81885 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81885/testReport)** for PR 18924 at commit [`9ce9655`](https://github.com/apache/spark/commit/9ce9655d2275454e016dd3f3b640e578f4b6e0e4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/12646 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/12646 **[Test build #81886 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81886/testReport)** for PR 12646 at commit [`79846bf`](https://github.com/apache/spark/commit/79846bfd86c6265ebe7d14906853bfe2ec0467f3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user brkyvz commented on the issue: https://github.com/apache/spark/pull/19196 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19230: [SPARK-22003][SQL] support array column in vectorized re...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19230 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81887 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81887/testReport)** for PR 19196 at commit [`be39125`](https://github.com/apache/spark/commit/be3912552d200fced24f69c436b59a2c24639380). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19230: [SPARK-22003][SQL] support array column in vector...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19230 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r139464231 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -925,6 +925,12 @@ object SQLConf { .intConf .createWithDefault(1) + val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE = +buildConf("spark.sql.binary.comparison.compatible.with.hive") + .doc("Whether compatible with Hive when binary comparison.") + .booleanConf + .createWithDefault(true) --- End diff -- This has to be `false`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r139464467 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -925,6 +925,12 @@ object SQLConf { .intConf .createWithDefault(1) + val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE = +buildConf("spark.sql.binary.comparison.compatible.with.hive") --- End diff -- -> `spark.sql.autoTypeCastingCompatibility` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r139464749 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -925,6 +925,12 @@ object SQLConf { .intConf .createWithDefault(1) + val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE = +buildConf("spark.sql.binary.comparison.compatible.with.hive") + .doc("Whether compatible with Hive when binary comparison.") --- End diff -- Binary comparison is just one of the implicit type casting cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r139465565 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -352,11 +374,16 @@ object TypeCoercion { p.makeCopy(Array(Cast(left, TimestampType), right)) case p @ Equality(left @ TimestampType(), right @ StringType()) => p.makeCopy(Array(left, Cast(right, TimestampType))) - case p @ BinaryComparison(left, right) -if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => +if !plan.conf.binaryComparisonCompatibleWithHive && + findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) + case p @ BinaryComparison(left, right) +if plan.conf.binaryComparisonCompatibleWithHive && --- End diff -- This is hard to maintain and debug. Instead of mixing them together, could you separate it from the current rule Spark uses? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19229 @viirya I run the code, you're right, most of time cost on the executedPlan generation (The old version code). thanks! But can you append benchmark comparison with `RDD.aggregate` version? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala#L102 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19266 Yeah, agree, it could be some global constant. I don't think it should be configurable. Ideally it's determined from the JVM, but don't know a way to do that. In many cases, assuming Int.MaxValue is the max array size when it's Int.MaxValue-8 doesn't matter much. For example, arguably I should leave the ML changes alone here, because, in the very rare case that a matrix size is somewhere between Int.MaxValue-8 and Int.MaxValue, it will fail anyway, and it's not avoidable given the user input. It's also, maybe, more conservative to not always assume anything beyond Int.MaxValue-8 is going to fail, and not "proactively" fail at this cutoff. However I think there are a smallish number of identifiable cases where Spark can very much avoid the failure (like BufferHolder), and they're the instances where an array size keeps doubling. Maybe we can stick to those clear cases? especially any one that seems to have triggered the original error? Those cases are few enough and related enough that I'm sure they're just one issue, not several. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81888 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81888/testReport)** for PR 19250 at commit [`950d33a`](https://github.com/apache/spark/commit/950d33a4835e56748963dfe002bfa7145d91469f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r139467580 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -624,7 +639,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } private def createTestConf(): SparkConf = { -new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) +new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) --- End diff -- The `kvstore` module already tests both in-memory and disk stores extensively (running the same set of tests on both). There isn't really anything that testing both here can add to those tests. At most I could make `FsHistoryProvider` do round-robin between disk and in-memory; but I don't see the point of running these tests twice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r139467662 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala --- @@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) --- End diff -- See previous reply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r139468080 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { -lookup(appId, attemptId) match { - case None => -logDebug(s"Application Attempt $appId/$attemptId not found") -false - case Some(latest) => -prevFileSize < latest.fileSize +try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize +} catch { + case _: NoSuchElementException => false +} + } + + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ + private def recordedFileSize(log: Path): Long = { +try { + listing.read(classOf[LogInfo], log.toString()).fileSize +} catch { + case _: NoSuchElementException => 0L } } + + private def load(appId: String): ApplicationInfoWrapper = { +listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { +val attempt = app.attempts.head + +val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) +} catch { + case _: NoSuchElementException => +app +} + +def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() +} + +val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + +val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) +listing.write(newAppInfo) --- End diff -- Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r139468045 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { -lookup(appId, attemptId) match { - case None => -logDebug(s"Application Attempt $appId/$attemptId not found") -false - case Some(latest) => -prevFileSize < latest.fileSize +try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize +} catch { + case _: NoSuchElementException => false +} + } + + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ + private def recordedFileSize(log: Path): Long = { +try { + listing.read(classOf[LogInfo], log.toString()).fileSize +} catch { + case _: NoSuchElementException => 0L } } + + private def load(appId: String): ApplicationInfoWrapper = { +listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { +val attempt = app.attempts.head --- End diff -- Because the listener processes a single event log; one event log == one attempt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r139468324 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { -lookup(appId, attemptId) match { - case None => -logDebug(s"Application Attempt $appId/$attemptId not found") -false - case Some(latest) => -prevFileSize < latest.fileSize +try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize +} catch { + case _: NoSuchElementException => false +} + } + + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ + private def recordedFileSize(log: Path): Long = { +try { + listing.read(classOf[LogInfo], log.toString()).fileSize +} catch { + case _: NoSuchElementException => 0L } } + + private def load(appId: String): ApplicationInfoWrapper = { +listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { +val attempt = app.attempts.head + +val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) +} catch { + case _: NoSuchElementException => +app +} + +def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() +} + +val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + +val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) +listing.write(newAppInfo) + } + + /** For testing. Returns internal data about a single attempt. */ + private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { +load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( + throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) + } + } private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" - private val NOT_STARTED = "" --- End diff -- This existed for compatibility with really old event logs that 2.x does not support anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r139468509 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +703,150 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** + * Current version of the data written to the listing database. When opening an existing + * db, if the version does not match this value, the FsHistoryProvider will throw away + * all data and re-generate the listing data from the event logs. + */ + private val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + version: Long, + logDir: String) + +private[history] case class LogInfo( + @KVIndexParam logPath: String, + fileSize: Long) + +private[history] class AttemptInfoWrapper( +val info: v1.ApplicationAttemptInfo, val logPath: String, -val name: String, -val appId: String, -attemptId: Option[String], -startTime: Long, -endTime: Long, -lastUpdated: Long, -sparkUser: String, -completed: Boolean, -val fileSize: Long, -appSparkVersion: String) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { -s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" +val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { +ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed, info.appSparkVersion) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( -id: String, -override val name: String, -override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( +val info: v1.ApplicationInfo, +val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { +ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { +new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18924 **[Test build #81885 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81885/testReport)** for PR 18924 at commit [`9ce9655`](https://github.com/apache/spark/commit/9ce9655d2275454e016dd3f3b640e578f4b6e0e4). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18924 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18924 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81885/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #81889 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81889/testReport)** for PR 19222 at commit [`7c2c0cb`](https://github.com/apache/spark/commit/7c2c0cb832101802a6c753d1f07d541284e602a7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139470472 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase --- End diff -- We can not reference outer variable and modify it in `map` function, it will generate undefined result. You need create the `logphatPartOption` object in the `map` function, like: ``` val localOptimizeDocConcentration = optimizeDocConcentration batch.mapPartitions { docs => ... val logphatPartOption = if (localOptimizeDocConcentration) Some(BDV.zeros[Double](k)) else None ... ``` And note that avoid directly use `optimizeDocConcentration` in `rdd.map` function because the var is class member and will cause the whole class object to serialize. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139467949 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase nonEmptyDocs.foreach { case (_, termCounts: Vector) => val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption)) +} + +val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]), + v : (BDM[Double], Option[BDV[Double]])) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + u +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase))( +elementWiseSumInPlace, elementWiseSumInPlace + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, batchSize)) + +expElogbetaBc.destroy(false) +stats.unpersist() --- End diff -- This line is useless. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC connection t...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19238 I can see the value, but it does not perform well in most cases if we using JDBC connection. Instead of adding the extra dialect to upstream, could you please add Hive as a separate data source? Thanks! https://spark.apache.org/third-party-projects.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19256: [SPARK-21338][SQL]implement isCascadingTruncateTable() m...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19256 It looks good, but the actual code should be very simple if you are writing using the Scala way --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id
Github user rxin commented on the issue: https://github.com/apache/spark/pull/19261 What does this even mean? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19267: [WIP][SPARK-20628][CORE] Blacklist nodes when the...
GitHub user juanrh opened a pull request: https://github.com/apache/spark/pull/19267 [WIP][SPARK-20628][CORE] Blacklist nodes when they transition to DECOMMISSIONING state in YARN ## What changes were proposed in this pull request? Dynamic cluster configurations where cluster nodes are added and removed frequently are common in public cloud environments, so this has become a [problem for Spark users](https://www.trulia.com/blog/tech/aws-emr-ad-hoc-spark-development-environme... ). To cope with this we propose implementing a mechanism in the line of YARNâs support for [graceful node decommission](https://issues.apache.org/jira/browse/YARN-914 ) or [Mesos maintenance primitives](https://mesos.apache.org/documentation/latest/maintenance/ ). These changes allow cluster nodes to be transitioned to a âdecommissioningâ state, at which point no more tasks will be scheduled on the executors running on those nodes. After a configurable drain time, nodes in the âdecommissioningâ state will transition to a âdecommissionedâ state, where shuffle blocks are not available anymore. Shuffle blocks stored on nodes in the âdecommissioningâ state are available to other executors. By preventing more tasks from running on nodes in the âdecommissioningâ state we avoid creating more shuffle blocks on those nodes, as those blocks wonât be available when nodes eventually transition to the âdecommissionedâ state. We have implemented a first version of this proposal for YARN, using Sparkâs [blacklisting mechanism for task scheduling](https://issues.apache.org/jira/browse/SPARK-8425 ) âavailable at the node level since Spark 2.2.0â to ensure tasks are not scheduled on nodes in the âdecommissioningâ state. With this solution it is the cluster manager, not the Spark application, that tracks the status of the node, and handles the transition from âdecommissioningâ to âdecommissionedâ. The Spark driver simply reacts to the node state transitions. ## How was this patch tested? All functionality has been tested with unit tests, with an integration test based on the BaseYarnClusterSuite, and with manual testing on a cluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/juanrh/spark SPARK-20628-yarn-decommissioning-blacklisting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19267.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19267 commit 35285871a40fbf0903784dfb8dc16bd4e37062fe Author: Juan Rodriguez Hortala Date: 2017-08-23T16:58:24Z Send Host status update signals to YarnSchedulerBackend, on Yarn node state changes commit 42891dafcfe7d25026719b6025c1272e7fe2d947 Author: Juan Rodriguez Hortala Date: 2017-08-23T18:49:04Z Add mechanism to Blacklist/Unblacklist nodes based on Node status changes in Cluster Manager commit 0c840c74a85012a4d91e349ae4830455ef3d680b Author: Juan Rodriguez Hortala Date: 2017-08-23T19:56:31Z Add configuration to independently enable/disable task execution blacklisting and decommissioning blacklisting commit f9fdfb01ac2486cc268b13568d129f926b3b8ab2 Author: Juan Rodriguez Hortala Date: 2017-08-23T20:29:49Z Integration test for Yarn node decommissioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19267: [WIP][SPARK-20628][CORE] Blacklist nodes when they trans...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19267 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18887 **[Test build #81890 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81890/testReport)** for PR 18887 at commit [`56b68a0`](https://github.com/apache/spark/commit/56b68a06e5fc41e423a10ed767c1eb50d62395f5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19261 I think we should not do it, because no DB vendor does it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19268: Incorrect Metric reported in MetricsReporter.scal...
GitHub user Taaffy opened a pull request: https://github.com/apache/spark/pull/19268 Incorrect Metric reported in MetricsReporter.scala Current implementation for processingRate-total uses wrong metric: mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond ## What changes were proposed in this pull request? Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond ## How was this patch tested? Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric. https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png";> Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Taaffy/spark patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19268.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19268 commit afe083ff45313ed07cb95ded4c089ead7d80ecce Author: Taaffy <32072374+taa...@users.noreply.github.com> Date: 2017-09-18T16:56:51Z Incorrect Metric reported in MetricsReporter.scala Current implementation for processingRate-total uses wrong metric: mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19268 Please make a JIRA @Taaffy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala
Github user Taaffy commented on the issue: https://github.com/apache/spark/pull/19268 Will do. Delete this pull afterwards? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19268 No way to make the change without a PR, so no leave it. http://spark.apache.org/contributing.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19268 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19232: [SPARK-22009][ML] Using treeAggregate improve some algs
Github user sethah commented on the issue: https://github.com/apache/spark/pull/19232 Sure, we all agree there is a mechanism for avoiding overhead. However, performance tests are very tricky things, 5% is not a huge improvement, and hard-coding the aggregation depth to `2` limits the utility of using `treeAggregate`. I think the change is probably fine for just the reason that `treeAggregate` shouldn't hurt performance and might speed things up. Still, I don't think there's enough information yet to determine under what circumstances this actually improves the performance, if any. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19268: Incorrect Metric reported in MetricsReporter.scala
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19268 **[Test build #3926 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3926/testReport)** for PR 19268 at commit [`afe083f`](https://github.com/apache/spark/commit/afe083ff45313ed07cb95ded4c089ead7d80ecce). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19218 - You can get it from the table metadata `table: CatalogTable` - `Insertintohadoopfsrelationcommand.scala ` is for data source tables. We only have the issues for Hive table writing, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/19269 [SPARK-22026][SQL][WIP] data source v2 write path ## What changes were proposed in this pull request? A working prototype for data source v2 write path. TODO: doc. ## How was this patch tested? new tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark data-source-v2-write Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19269 commit 7b6a6b785bd56f9c7cec8999aa31c0427eb05e55 Author: Wenchen Fan Date: 2017-09-18T17:12:55Z data source v2 write path --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19269 **[Test build #81891 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81891/testReport)** for PR 19269 at commit [`7b6a6b7`](https://github.com/apache/spark/commit/7b6a6b785bd56f9c7cec8999aa31c0427eb05e55). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19269 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19269 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81891/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19269 **[Test build #81891 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81891/testReport)** for PR 19269 at commit [`7b6a6b7`](https://github.com/apache/spark/commit/7b6a6b785bd56f9c7cec8999aa31c0427eb05e55). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)` * `class RowToUnsafeRowWriteTask(rowWriteTask: WriteTask[Row], schema: StructType)` * `class RowToUnsafeRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row])` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...
Github user buryat commented on a diff in the pull request: https://github.com/apache/spark/pull/19266#discussion_r139489491 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -39,7 +39,7 @@ private final long length; public LongArray(MemoryBlock memory) { -assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements"; +assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 2.1 billion elements"; --- End diff -- `assert memory.size() < (long) (Integer.MAX_VALUE - 8) * 8` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...
Github user buryat commented on a diff in the pull request: https://github.com/apache/spark/pull/19266#discussion_r139489658 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java --- @@ -30,11 +30,15 @@ HashMapGrowthStrategy DOUBLING = new Doubling(); class Doubling implements HashMapGrowthStrategy { + +private static final int ARRAY_MAX = Integer.MAX_VALUE - 8; --- End diff -- Maybe worth adding a comment why this values is chosen as the max Like here http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l223 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19266: [SPARK-22033][CORE] BufferHolder, other size chec...
Github user buryat commented on a diff in the pull request: https://github.com/apache/spark/pull/19266#discussion_r139492024 --- Diff: core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala --- @@ -126,22 +126,20 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable /** Increase our size to newSize and grow the backing array if needed. */ private def growToSize(newSize: Int): Unit = { -if (newSize < 0) { - throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements") +val arrayMax = Int.MaxValue - 8 +if (newSize < 0 || newSize - 2 > arrayMax) { + throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements") } val capacity = if (otherElements != null) otherElements.length + 2 else 2 if (newSize > capacity) { - var newArrayLen = 8 + var newArrayLen = 8L while (newSize - 2 > newArrayLen) { --- End diff -- I think we can remove `- 2` now since `newArrayLen` is a Long --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139493581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], + */ +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { --- End diff -- Yes, these functions are duplicated, as well as some code in `doExecute()`. I could add a common base class like `EvalPythonExec` to clean this up, and maybe move to the same file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r139496371 --- Diff: python/pyspark/sql/functions.py --- @@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -def _udf(f, returnType=StringType()): -udf_obj = UserDefinedFunction(f, returnType) -return udf_obj._wrapped() +return _create_udf(f, returnType=returnType, vectorized=False) -# decorator @udf, @udf() or @udf(dataType()) -if f is None or isinstance(f, (str, DataType)): -# If DataType has been passed as a positional argument -# for decorator use it as a returnType -return_type = f or returnType -return functools.partial(_udf, returnType=return_type) + +@since(2.3) +def pandas_udf(f=None, returnType=StringType()): +""" +Creates a :class:`Column` expression representing a user defined function (UDF) that accepts +`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. + +:param f: python function if used as a standalone function +:param returnType: a :class:`pyspark.sql.types.DataType` object + +# TODO: doctest +""" +import inspect +# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder +if inspect.getargspec(f).keywords is None: +return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) --- End diff -- If the user function doesn't define the keyword args, then it is wrapped with a placeholder so that `worker.py` can expect the function to always have keywords. I thought this was better than trying to do inspection on the worker while running the UDF. I'm not crazy about the 0-parameter pandas_udf, but if we have to support it here then it does need to get the required length of output somehow, unless we repeat/slice the output to make the length correct. I'm ok with making `**kwargs` mandatory for 0-parameter UDFs and optional for others. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18704 **[Test build #81883 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81883/testReport)** for PR 18704 at commit [`bdecaaf`](https://github.com/apache/spark/commit/bdecaaf045bb135239d09919af92f718e5d0b2c5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18704 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81883/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18704 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81888 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81888/testReport)** for PR 19250 at commit [`950d33a`](https://github.com/apache/spark/commit/950d33a4835e56748963dfe002bfa7145d91469f). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19266: [SPARK-22033][CORE] BufferHolder, other size checks shou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19266 **[Test build #81882 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81882/testReport)** for PR 19266 at commit [`8dbfa30`](https://github.com/apache/spark/commit/8dbfa30848b49a9fe4038c327222ddc9fd9a0ec0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org