[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r195287202 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.protocol; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NettyManagedBuffer; + +/** + * An RPC with data that is sent outside of the frame, so it can be read as a stream. + */ +public final class UploadStream extends AbstractMessage implements RequestMessage { --- End diff -- Is it possible to merge UploadStream and RpcRequest into a class? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r195284967 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java --- @@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) { handler.addRpcRequest(requestId, callback); channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))) -.addListener(future -> { - if (future.isSuccess()) { -long timeTaken = System.currentTimeMillis() - startTime; -if (logger.isTraceEnabled()) { - logger.trace("Sending request {} to {} took {} ms", requestId, -getRemoteAddress(channel), timeTaken); -} - } else { -String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, - getRemoteAddress(channel), future.cause()); -logger.error(errorMsg, future.cause()); -handler.removeRpcRequest(requestId); -channel.close(); -try { - callback.onFailure(new IOException(errorMsg, future.cause())); -} catch (Exception e) { - logger.error("Uncaught exception in RPC response callback handler!", e); -} - } -}); + .addListener(new RpcChannelListener(startTime, requestId, callback)); + +return requestId; + } + + /** + * Send data to the remote end as a stream. This differs from stream() in that this is a request + * to *send* data to the remote end, not to receive it from the remote. + * + * @param meta meta data associated with the stream, which will be read completely on the + * receiving end before the stream itself. + * @param data this will be streamed to the remote end to allow for transferring large amounts + * of data without reading into memory. + * @param callback handles the reply -- onSuccess will only be called when both message and data + * are received successfully. + */ + public long uploadStream( + ManagedBuffer meta, + ManagedBuffer data, + RpcResponseCallback callback) { +long startTime = System.currentTimeMillis(); +if (logger.isTraceEnabled()) { + logger.trace("Sending RPC to {}", getRemoteAddress(channel)); +} + +long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); --- End diff -- This `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated twice. Move it to a separate new method . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r192279111 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. + * If stream data is not null, you *must* call streamData.registerStreamCallback + * before this method returns. + * * @param client A channel client which enables the handler to make requests back to the sender * of this RPC. This will always be the exact same object for a particular channel. * @param message The serialized bytes of the RPC. + * @param streamData StreamData if there is data which is meant to be read via a StreamCallback; + * otherwise it is null. * @param callback Callback which should be invoked exactly once upon success or failure of the * RPC. */ public abstract void receive( TransportClient client, ByteBuffer message, + StreamData streamData, --- End diff -- What about incorporating parameter `message` into parameter `streamData`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r191628993 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. + * If stream data is not null, you *must* call streamData.registerStreamCallback + * before this method returns. + * * @param client A channel client which enables the handler to make requests back to the sender * of this RPC. This will always be the exact same object for a particular channel. * @param message The serialized bytes of the RPC. + * @param streamData StreamData if there is data which is meant to be read via a StreamCallback; + * otherwise it is null. * @param callback Callback which should be invoked exactly once upon success or failure of the * RPC. */ public abstract void receive( TransportClient client, ByteBuffer message, + StreamData streamData, --- End diff -- It's not necessary to add a parameter. Change the message parameter to InputStream. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14658: [WIP][SPARK-5928][SPARK-6238] Remote Shuffle Blocks cann...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/14658 Spark 2.2 has fixed this issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/17882 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17882: [WIP][SPARK-20079][yarn] Re registration of AM hangs spa...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17882 I'm very sorry, I haven't taken the time to update this code recently. @vanzin , thank you for your work. I'll close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14995: [Test Only][SPARK-6235][CORE]Address various 2G limits
Github user witgo commented on the issue: https://github.com/apache/spark/pull/14995 I did not do much testing, but I think it can be used in the production environment the url: https://github.com/witgo/spark/tree/SPARK-6235_Address_various_2G_limits --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-19486][CORE](try 3) Investigate using multiple th...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 @jiangxb1987 ,Yes do you have any questions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17882#discussion_r121972913 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend( } /** - * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed - * and re-registered itself to driver after a failure. The stale state in driver should be - * cleaned. - */ - override protected def reset(): Unit = { --- End diff -- I think `ExecutorAllocationManager#reset` is still necessary, but the following code should be removed ```scala initializing = true numExecutorsTarget = initialNumExecutors numExecutorsToAdd = 1 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17882#discussion_r120652302 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -68,6 +68,8 @@ private[spark] abstract class YarnSchedulerBackend( // Flag to specify whether this schedulerBackend should be reset. private var shouldResetOnAmRegister = false + private var lastRequestExecutors = RequestExecutors(-1, -1, Map.empty, Set.empty) --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17882: [SPARK-20079][yarn] Re registration of AM hangs spark cl...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17882 @vanzin Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17882#discussion_r120644588 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend( } /** - * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed - * and re-registered itself to driver after a failure. The stale state in driver should be - * cleaned. - */ - override protected def reset(): Unit = { --- End diff -- From the current code, we reset the state of `ExecutorAllocationManager` is not correct. Iy causes the `RetrieveLastRequestExecutors` message to not work properly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17882: [WIP][SPARK-20079][try 2][yarn] Re registration of AM ha...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17882 @jerryshao @vanzin Would you take some time to review this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/18008 @JoshRosen I see, Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/18008 @JoshRosen , what's the tool in your screenshot? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [WIP][SPARK-20079][try 2][yarn] Re registration o...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17882 [WIP][SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster in yarn-client mode. See #17480 You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-20079_try2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17882 commit fd4c486725ae49d068e67088a185fb4cc229e21f Author: Guoqiang Li Date: 2017-03-30T14:17:49Z SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. commit fa27d7f6da49659c8aa6efe55a3e457e883eb17a Author: Guoqiang Li Date: 2017-04-04T04:33:45Z review commits commit 6341d31c2961a08db2f36339f5ebe8c814eeb4c7 Author: Guoqiang Li Date: 2017-04-07T10:32:29Z review commits commit e992df93e7222d5d2bd66d9a2c19984c9b241fd5 Author: Guoqiang Li Date: 2017-04-23T04:56:58Z Delete "initializing = true" in ExecutorAllocationManager.reset commit 917cf43ffaaeb20347df3c7e480cb75ae87dca83 Author: Guoqiang Li Date: 2017-05-06T13:28:28Z Add msg: RetrieveLastAllocatedExecutorNumber --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r112825043 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,6 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true --- End diff -- @jerryshao @vanzin I think that deleting the `initializing = true` is a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 OK, I will do the work at weekends. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 @vanzin Sorry, I do not understand what you mean. Do you submit a new PR to your own ideas? If you can, I will close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r53390 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ +if (maxNumExecutorsNeeded() == 0) { + initializing = true --- End diff -- OK. I've got it, thx. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r110804557 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ +if (maxNumExecutorsNeeded() == 0) { + initializing = true --- End diff -- The following code should have a similar function? ```scala numExecutorsTarget = initialNumExecutors // The default value is 0 numExecutorsToAdd = 1 ``` The incoming parameters of the client.requestTotalExecutors method are 1,2,4,8,16... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r110796578 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ +if (maxNumExecutorsNeeded() == 0) { + initializing = true --- End diff -- @jerryshao Can you explain the following comments? I do not understand. ```scala if (initializing) { // Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 } else if (maxNeeded < numExecutorsTarget) { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17567 OK, I see. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17567 LGTM. Are there any performance test reports? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r110361779 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled, resetting the initializing field may cause + * it to not be set to false in yarn. --- End diff -- Currently this method will only be called in yarn-client mode when AM re-registers after a failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r109575470 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,9 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +if (maxNumExecutorsNeeded() == 0) { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 @jerryshao Yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 The ExecutorAllocationManager.reset method is called when re-registering AM, which sets the ExecutorAllocationManager.initializing field true. When this field is true, the Driver does not start a new executor from the AM request. The following two cases will cause the field to False 1. executor idle for some time. 2. There are new stages to be submitted If the stage after the submission, AM was killed and restart, the above two cases will not appear. 1. When AM is killed, the yarn will kill all running containers. All execuotr will be lost and no executor will be idle. 2. No surviving executor, resulting in the current stage will never be completed, DAG will not submit a new stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: SPARK-20079: Re registration of AM hangs spark cl...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17480 SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. When there is some need of task scheduling, `ExecutorAllocationManager` instances do not reset the `initializing` field ## How was this patch tested? Unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-20079 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17480 commit b91dfeb4fea445727f6b5430aa947f35a287d56d Author: Guoqiang Li Date: 2017-03-30T14:17:49Z SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/17329 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r108049460 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Sorry,I didn't get your idea. Can you write some code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r108032378 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Suppose there are E Executor in the cluster, a shuffle process has M Map task, R reduce task, in the master branch will be created: 1. Up to M * R FileSegmentManagedBuffer instances 2. Up to 2 * M * R NoSuchElementException instances in this PR will be created: 1. Up to M * R FileSegmentManagedBuffer instances 2. Up to 2 * NoSuchElementException instances (ExternalShuffleBlockResolver and IndexShuffleBlockResolver are created once executor starts and They call the new constructor to create a FileSegmentManagedBuffer instance) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r108027203 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- This branch [SPARK-19991_try2 ](https://github.com/witgo/spark/commits/SPARK-19991_try2) needs `244.45` s in my test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r107841105 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Yes, But the above code does not optimize performance, `FileSegmentManagedBuffer.convertToNetty` method is also called only once in the master branch code . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r107706851 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Like the following code? ```java public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { this.lazyFileDescriptor = conf.lazyFileDescriptor(); this.memoryMapBytes = conf.memoryMapBytes(); this.file = file; this.offset = offset; this.length = length; } ``` the code `conf.lazyFileDescriptor();` or `conf.memoryMapBytes();` creates a NoSuchElementException instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r107572297 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- That will change a lot of code, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r106781598 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Oh, do you have a better idea? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17329: [SPARK-19991]FileSegmentManagedBuffer performance improv...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17329 ```java public class HadoopConfigProvider extends ConfigProvider { private final Configuration conf; public HadoopConfigProvider(Configuration conf) { this.conf = conf; } @Override public String get(String name) { String value = conf.get(name); // When do not set the value of spark.storage.memoryMapThreshold or spark.shuffle.io.lazyFD, // When the value of `value` is null if (value == null) { throw new NoSuchElementException(name); } return value; } @Override public Iterable> getAll() { return conf; } } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17329 [SPARK-19991]FileSegmentManagedBuffer performance improvement FileSegmentManagedBuffer performance improvement. ## What changes were proposed in this pull request? When we do not set the value of the configuration items `spark.storage.memoryMapThreshold` and `spark.shuffle.io.lazyFD`, each call to the cFileSegmentManagedBuffer.nioByteBuffer or FileSegmentManagedBuffer.createInputStream method creates a NoSuchElementException instance. This is a more time-consuming operation. In the use case, this PR can improve the performance of about 3.5% The test code: ``` scala (1 to 10).foreach { i => val numPartition = 1 val rdd = sc.parallelize(0 until numPartition).repartition(numPartition).flatMap { t => (0 until numPartition).map(r => r * numPartition + t) }.repartition(numPartition) val serializeStart = System.currentTimeMillis() rdd.sum() val serializeFinish = System.currentTimeMillis() println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f") } ``` and `spark-defaults.conf` file: ``` spark.master yarn-client spark.executor.instances 20 spark.driver.memory 64g spark.executor.memory 30g spark.executor.cores 5 spark.default.parallelism 100 spark.sql.shuffle.partitions 100 spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.maxResultSize0 spark.ui.enabled false spark.driver.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M spark.executor.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M spark.cleaner.referenceTracking.blocking true spark.cleaner.referenceTracking.blocking.shuffle true ``` The test results are as follows | [SPARK-19991](https://github.com/witgo/spark/tree/SPARK-19991) |https://github.com/apache/spark/commit/68ea290b3aa89b2a539d13ea2c18bdb5a651b2bf| |---| --- | |226.09 s| 235.21 s| ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-19991 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17329 commit abcfc79991ecd1d5cef2cd1e275b872695ba19d9 Author: Guoqiang Li Date: 2017-03-17T03:19:37Z FileSegmentManagedBuffer performance improvement --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 @kayousterhout The test report has been updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 Added the multi-threaded code for serialization `TaskDescription` . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 ping @kayousterhout @squito --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17116: [SPARK-18890][CORE](try 2) Move task serializatio...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/17116 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 [SPARK-18890_20170303](https://github.com/witgo/spark/commits/SPARK-18890_20170303) `s code is older but the test case running time is 5.2 s --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 Yes, maybe a multithreaded serialization task code can have a better performance, let me close the PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/15505 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17139: [WIP][SPARK-18890][CORE](try 3) Move task seriali...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17139 [WIP][SPARK-18890][CORE](try 3) Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend ## What changes were proposed in this pull request? See https://issues.apache.org/jira/browse/SPARK-18890 In the case of stage has a lot of tasks, this PR can improve the scheduling performance of ~~15%~~ The test code: ``` scala val rdd = sc.parallelize(0 until 100).repartition(10) rdd.localCheckpoint().count() rdd.sum() (1 to 10).foreach{ i=> val serializeStart = System.currentTimeMillis() rdd.sum() val serializeFinish = System.currentTimeMillis() println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f") } ``` and `spark-defaults.conf` file: ``` spark.master yarn-client spark.executor.instances 20 spark.driver.memory 64g spark.executor.memory 30g spark.executor.cores 5 spark.default.parallelism 100 spark.sql.shuffle.partitions 100 spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.maxResultSize0 spark.ui.enabled false spark.driver.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M spark.executor.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M spark.cleaner.referenceTracking.blocking true spark.cleaner.referenceTracking.blocking.shuffle true ``` The test results are as follows **The table is out of date, to be updated** | [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) | [941b3f9](https://github.com/apache/spark/commit/941b3f9aca59e62c078508a934f8c2221ced96ce) | | --- | --- | | 17.116 s | 21.764 s | ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-18890-multi-threading Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17139 commit bfa285b1bd677e0c0b8a57ceda4433bb8ae072e9 Author: Guoqiang Li Date: 2017-03-02T14:50:54Z Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 Do not know which pr causes the run time of this test case to be reduced from 21.764 s to 9.566 s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout Test results have been updated: | [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) |https://github.com/apache/spark/commit/db0ddce523bb823cba996e92ef36ceca31492d2c| | --- | --- | | 9.427 s | 9.566 s | --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout It takes some time to update the test report. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103636619 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( --- End diff -- @kayousterhout @squito Refactoring code in https://github.com/apache/spark/pull/17116 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17116: [SPARK-18890][CORE](try 2) Move task serializatio...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17116 [SPARK-18890][CORE](try 2) Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend ## What changes were proposed in this pull request? See https://issues.apache.org/jira/browse/SPARK-18890 In the case of stage has a lot of tasks, this PR can improve the scheduling performance of ~~15%~~ The test code: ``` scala val rdd = sc.parallelize(0 until 100).repartition(10) rdd.localCheckpoint().count() rdd.sum() (1 to 10).foreach{ i=> val serializeStart = System.currentTimeMillis() rdd.sum() val serializeFinish = System.currentTimeMillis() println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f") } ``` and `spark-defaults.conf` file: ``` spark.master yarn-client spark.executor.instances 20 spark.driver.memory 64g spark.executor.memory 30g spark.executor.cores 5 spark.default.parallelism 100 spark.sql.shuffle.partitions 100 spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.maxResultSize0 spark.ui.enabled false spark.driver.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M spark.executor.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M spark.cleaner.referenceTracking.blocking true spark.cleaner.referenceTracking.blocking.shuffle true ``` The test results are as follows **The table is out of date, to be updated** | [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) | [941b3f9](https://github.com/apache/spark/commit/941b3f9aca59e62c078508a934f8c2221ced96ce) | | --- | --- | | 17.116 s | 21.764 s | ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-18890 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17116.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17116 commit 84550a73b3f786b2ac569500a95ac38dc6f44657 Author: Guoqiang Li Date: 2017-01-08T11:18:59Z Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend commit 39aa22e8b95a25cd250eeafeb5ec800dfa794896 Author: Guoqiang Li Date: 2017-01-11T06:05:53Z review commits commit d562727dfd554c78ea17e590841a1e74b9b4f9aa Author: Guoqiang Li Date: 2017-01-13T02:10:03Z add test "Scheduler aborts stages that have unserializable partition" commit fc6789e027e8ea935ad392cfca90dd318a6d9e57 Author: Imran Rashid Date: 2017-01-13T21:42:44Z refactor commit 1edcf2a7e65e7c9373782824a71ec87909e88097 Author: Guoqiang Li Date: 2017-01-16T01:10:44Z create all the serialized tasks to make sure they all work commit 79dda74ab27eb9a2630921816305e489aef4f72e Author: Guoqiang Li Date: 2017-01-22T03:28:04Z review commits commit 0b20da4c0f79d89b0689e19c6b5e3fcdf8b360fb Author: Guoqiang Li Date: 2017-01-25T01:09:20Z add lock on the scheduler object commit b5de21f510697d42a9c7f7f255d20a41641a5122 Author: Kay Ousterhout Date: 2017-02-07T00:38:48Z Consolidate TaskDescrition constructors. This commit also does all task serializion in the encode() method, so now the encode() method just takes the TaskDescription as an input parameter. commit 819a88cb74a41c446a442ff91fb14f1093025f77 Author: Guoqiang Li Date: 2017-02-07T15:21:11Z Refactor the taskDesc serialization code commit 900884b82f67d2f51f73aff49b671b2dcb450264 Author: Guoqiang Li Date: 2017-02-09T16:58:15Z Add ut: serialization task errors do not affect each other commit 0812fc9067b9a0652de64e5c0539eaed9d8f243d Author: Guoqiang Li Date: 2017-02-25T17:17:03Z askWithRetry => askSync commit 0dae93a1edd2644cb63656ae56d41adaa59cf5e3 Author: Guoqiang Li Date: 2017-02-27T10:19:13Z fix the import ordering in TaskDescription.scala commit 9dab121247cf8b30912f016c404279acd0b42f41 Author: Guoqiang Li Date: 2017-03-01T08:06:39Z review commits commit b67bdaf52e44b320c565bef79fd2dac6904620ae Author: Guoqiang Li Date: 2017-03-01T08:51:59Z move prepareSerializedTask to TaskSchedulerImpl.scala --- If your project is set up for it, you c
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631341 --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala --- @@ -164,17 +164,18 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug index = 0, addedFiles = Map[String, Long](), addedJars = Map[String, Long](), - properties = new Properties, - serializedTask) + properties = new Properties) } - private def runTaskAndGetFailReason(taskDescription: TaskDescription): TaskFailedReason = { + private def runTaskAndGetFailReason( +taskDescription: TaskDescription, +serializedTask: ByteBuffer): TaskFailedReason = { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631404 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -130,7 +152,7 @@ private[spark] object TaskDescription { // Create a sub-buffer for the serialized task into its own buffer (to be deserialized later). val serializedTask = byteBuffer.slice() -new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, - properties, serializedTask) +(new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, + properties), serializedTask) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631322 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,8 +55,26 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { - +// Task object corresponding to the TaskDescription. This is only defined on the master; on +// the worker, the Task object is handled separately from the TaskDescription so that it can +// deserialized after the TaskDescription is deserialized. --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631360 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( +scheduler: TaskSchedulerImpl, +task: TaskDescription, +abortSet: HashSet[TaskSetManager], +maxRpcMessageSize: Long): ByteBuffer = { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631278 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631373 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -195,6 +197,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp makeOffers() } + // Only be used for testing. + case ReviveOffers => --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631354 --- Diff: core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala --- @@ -82,9 +88,15 @@ private[spark] class LocalEndpoint( def reviveOffers() { val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) +val abortTaskSet = new HashSet[TaskSetManager]() for (task <- scheduler.resourceOffers(offers).flatten) { - freeCores -= scheduler.CPUS_PER_TASK - executor.launchTask(executorBackend, task) + val buffer = prepareSerializedTask(scheduler, task, --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631300 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( +scheduler: TaskSchedulerImpl, +task: TaskDescription, +abortSet: HashSet[TaskSetManager], +maxRpcMessageSize: Long): ByteBuffer = { +var serializedTask: ByteBuffer = null +if (abortSet.isEmpty || !getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)) { + try { +serializedTask = TaskDescription.encode(task) + } catch { +case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, +s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) + scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => abortSet.add(t)) + } +} + +if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + +"spark.rpc.message.maxSize (%d bytes). Consider increasing " + +"spark.rpc.message.maxSize or using broadcast variables for large values." + abortTaskSetManager(scheduler, task.taskId, +msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) + getTaskSetManager(scheduler, task.taskId).foreach(t => abortSet.add(t)) + serializedTask = null +} else if (serializedTask != null) { + emittedTaskSizeWarning(scheduler, serializedTask, task.taskId) +} +serializedTask + } + + private def emittedTaskSizeWarning( --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631294 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( +scheduler: TaskSchedulerImpl, +task: TaskDescription, +abortSet: HashSet[TaskSetManager], +maxRpcMessageSize: Long): ByteBuffer = { +var serializedTask: ByteBuffer = null +if (abortSet.isEmpty || !getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)) { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631260 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631249 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,8 +55,26 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { - +// Task object corresponding to the TaskDescription. This is only defined on the master; on +// the worker, the Task object is handled separately from the TaskDescription so that it can +// deserialized after the TaskDescription is deserialized. +@transient private val task: Task[_] = null) extends Logging { + + /** + * Serializes the task for this TaskDescription and returns the serialized task. + * + * This method should only be used on the master (to serialize a task to send to a worker). --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103631254 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103627873 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception --- End diff -- I don't know why the old code ignored the exception, just copied it here. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103627459 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( --- End diff -- OK I will think over your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103622591 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) + } catch { +case e: Exception => logError("Exception in error callback", e) + } +} + } + + private def isZombieTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Unit = scheduler.synchronized { +!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long): Option[TaskSetManager] = scheduler.synchronized { +scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( +scheduler: TaskSchedulerImpl, +task: TaskDescription, +abortSet: HashSet[TaskSetManager], +maxRpcMessageSize: Long): ByteBuffer = { +var serializedTask: ByteBuffer = null +if (abortSet.isEmpty || !getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)) { --- End diff -- This is an optimization code, in most cases abortSet is empty and the `!getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)` is not called. I changed it into a more readable code. see https://github.com/apache/spark/pull/15505/commits/2f4b3f955cad0fe7546de54737c19500b90ad67d --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103622493 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -454,33 +452,15 @@ private[spark] class TaskSetManager( currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } -// Serialize and return the task -val serializedTask: ByteBuffer = try { - ser.serialize(task) -} catch { - // If the task cannot be serialized, then there's no point to re-attempt the task, - // as it will always fail. So just abort the whole task-set. - case NonFatal(e) => -val msg = s"Failed to serialize task $taskId, not attempting to retry it." -logError(msg, e) -abort(s"$msg Exception during serialization: $e") -throw new TaskNotSerializableException(e) -} -if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { - emittedTaskSizeWarning = true - logWarning(s"Stage ${task.stageId} contains a task of very large size " + -s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + -s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") -} + addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") --- End diff -- taskLocality only exists here. How about change CoarseGrainedSchedulerBackend (line 273) to ``` scala logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}, serializedTask: ${serializedTask.limit} bytes.") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r103174824 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -23,7 +23,10 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, Map} +import scala.util.control.NonFatal +import org.apache.spark.internal.Logging +import org.apache.spark.TaskNotSerializableException --- End diff -- I'm sorry, I forgot that. It's done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout I think the latest code is ready to merge into the master branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r102891969 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -39,16 +40,18 @@ private[spark] sealed trait MapStatus { * necessary for correctness, since block fetchers are allowed to skip zero-size blocks. */ def getSizeForBlock(reduceId: Int): Long + + def numberOfOutput: Int --- End diff -- The number of output may be greater than 2G? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 Okay, this may take some time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99848516 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} + val serializedTasks = tasks.flatten.map { task => +var serializedTask: ByteBuffer = null +try { + serializedTask = TaskDescription.encode(task, task.serializedTask) + if (serializedTask.limit >= maxRpcMessageSize) { +val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." +abortTaskSetManager(scheduler, task.taskId, + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +serializedTask = null + } else if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => +taskSetMgr.emittedTaskSizeWarning = true +val stageId = taskSetMgr.taskSet.stageId +logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } } +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) } -else { +(task, serializedTask) + } + + if (!serializedTasks.exists(b => b._2 eq null)) { --- End diff -- OK --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16806: [WIP][SPARK-18890][CORE] Move task serialization ...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/16806 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16806: [WIP][SPARK-18890][CORE] Move task serialization ...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/16806 [WIP][SPARK-18890][CORE] Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend ## What changes were proposed in this pull request? See https://issues.apache.org/jira/browse/SPARK-18890 In the case of stage has a lot of tasks, this PR can improve the scheduling performance of ~~15%~~ The test code: ``` scala val rdd = sc.parallelize(0 until 100).repartition(10) rdd.localCheckpoint().count() rdd.sum() (1 to 10).foreach{ i=> val serializeStart = System.currentTimeMillis() rdd.sum() val serializeFinish = System.currentTimeMillis() println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f") } ``` and `spark-defaults.conf` file: ``` spark.master yarn-client spark.executor.instances 20 spark.driver.memory 64g spark.executor.memory 30g spark.executor.cores 5 spark.default.parallelism 100 spark.sql.shuffle.partitions 100 spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.maxResultSize0 spark.ui.enabled false spark.driver.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M spark.executor.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M spark.cleaner.referenceTracking.blocking true spark.cleaner.referenceTracking.blocking.shuffle true ``` The test results are as follows **The table is out of date, to be updated** | [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) | [941b3f9](https://github.com/apache/spark/commit/941b3f9aca59e62c078508a934f8c2221ced96ce) | | --- | --- | | 17.116 s | 21.764 s | ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-18890-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16806.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16806 commit ab5a763375e5b9308e55acbedfb1e7bf2cb739de Author: Guoqiang Li Date: 2017-01-08T11:18:59Z Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend commit 292a8bcf09fce3826b658c18c5d923379346fe52 Author: Guoqiang Li Date: 2017-01-11T06:05:53Z review commits commit 469586efd4abf47a5f891a6a4b72bba83e608aaf Author: Guoqiang Li Date: 2017-01-13T02:10:03Z add test "Scheduler aborts stages that have unserializable partition" commit 8f7edc6c16c25aae6fae4f6dc6fa76eca8f06fd6 Author: Guoqiang Li Date: 2017-02-04T14:07:51Z Refactor the serialization TaskDescription code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99463382 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, --- End diff -- If we do not deal with the problem (2), it should only lead to performance degradation when a serialization error occurred? or add the following code ```scala // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { val abortSet = new mutable.HashSet[TaskSetManager]() for (task <- tasks.flatten) { var serializedTask: ByteBuffer = null if (abortSet.isEmpty || !scheduler.taskIdToTaskSetManager.get(task.taskId).exists(_.isZombie)) { try { serializedTask = TaskDescription.encode(task) } catch { case NonFatal(e) => abortTaskSetManager(scheduler, task.taskId, s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => abortSet.add(t)) } } if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." abortTaskSetManager(scheduler, task.taskId, msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => abortSet.add(t)) } else if (serializedTask != null) { if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). foreach { taskSetMgr => taskSetMgr.emittedTaskSizeWarning = true val stageId = taskSetMgr.taskSet.stageId logWarning(s"Stage $stageId contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } } val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + s" hostname: ${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99462808 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} + val serializedTasks = tasks.flatten.map { task => +var serializedTask: ByteBuffer = null +try { + serializedTask = TaskDescription.encode(task, task.serializedTask) + if (serializedTask.limit >= maxRpcMessageSize) { +val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." +abortTaskSetManager(scheduler, task.taskId, + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +serializedTask = null + } else if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => +taskSetMgr.emittedTaskSizeWarning = true +val stageId = taskSetMgr.taskSet.stageId +logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } } +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) } -else { +(task, serializedTask) + } + + if (!serializedTasks.exists(b => b._2 eq null)) { --- End diff -- I will reset the relevant code. The current has the potential to affect other taskSet scheduling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99462431 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -66,7 +100,8 @@ private[spark] object TaskDescription { } } - def encode(taskDescription: TaskDescription): ByteBuffer = { + @throws[TaskNotSerializableException] + def encode(taskDescription: TaskDescription, serializedTask: ByteBuffer): ByteBuffer = { --- End diff -- Oh, I'm already confused, should I remove the commit https://github.com/apache/spark/pull/15505/commits/5e03e2cf9786af6d3ea555f6f61b6a60b23f1c2c? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99462387 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} + val serializedTasks = tasks.flatten.map { task => +var serializedTask: ByteBuffer = null +try { + serializedTask = TaskDescription.encode(task, task.serializedTask) + if (serializedTask.limit >= maxRpcMessageSize) { +val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." +abortTaskSetManager(scheduler, task.taskId, + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +serializedTask = null + } else if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => +taskSetMgr.emittedTaskSizeWarning = true +val stageId = taskSetMgr.taskSet.stageId +logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } } +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) } -else { +(task, serializedTask) + } + + if (!serializedTasks.exists(b => b._2 eq null)) { --- End diff -- @kayousterhout I made a mistake, task 2 will be re-scheduled when checkSpeculatableTasks is called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99458252 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -51,8 +54,39 @@ private[spark] class TaskDescription( val index: Int,// Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], -val properties: Properties, -val serializedTask: ByteBuffer) { +val properties: Properties) extends Logging { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, // Index within this task's TaskSet + addedFiles: Map[String, Long], + addedJars: Map[String, Long], + properties: Properties, + task: Task[_]) { + this(taskId, attemptNumber, executorId, name, index, +addedFiles, addedJars, properties) + task_ = task + } + + def serializedTask: ByteBuffer = { --- End diff -- OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99458165 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -51,8 +54,39 @@ private[spark] class TaskDescription( val index: Int,// Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], -val properties: Properties, -val serializedTask: ByteBuffer) { +val properties: Properties) extends Logging { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, // Index within this task's TaskSet + addedFiles: Map[String, Long], + addedJars: Map[String, Long], + properties: Properties, + task: Task[_]) { + this(taskId, attemptNumber, executorId, name, index, +addedFiles, addedJars, properties) + task_ = task --- End diff -- I think TaskDescription has two constructors that are more readable, and we can add a comment about the constructor with the `task: Task [_]` parameter is only be called in driver. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99458022 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -66,7 +100,8 @@ private[spark] object TaskDescription { } } - def encode(taskDescription: TaskDescription): ByteBuffer = { + @throws[TaskNotSerializableException] + def encode(taskDescription: TaskDescription, serializedTask: ByteBuffer): ByteBuffer = { --- End diff -- The previous version had only one parameter, and the two parameter in here is more readable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r99457658 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} + val serializedTasks = tasks.flatten.map { task => +var serializedTask: ByteBuffer = null +try { + serializedTask = TaskDescription.encode(task, task.serializedTask) + if (serializedTask.limit >= maxRpcMessageSize) { +val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." +abortTaskSetManager(scheduler, task.taskId, + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +serializedTask = null + } else if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => +taskSetMgr.emittedTaskSizeWarning = true +val stageId = taskSetMgr.taskSet.stageId +logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } } +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) } -else { +(task, serializedTask) + } + + if (!serializedTasks.exists(b => b._2 eq null)) { --- End diff -- Yes, task 2 can only wait for the next makeOffers to be called --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r97695455 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -602,6 +619,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(false) } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + + // abort TaskSetManager without exception + private[scheduler] def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = { +scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) --- End diff -- You said the problem But we ensure that the PR will not be able to introduce this issue. Although I just abstracted some code into a method only, did not change the order of its implementation, we insured, I still add a lock on the scheduler object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @squito My understanding is that the TaskSchedulerImpl class contains many synchronized statements (synchronized the methods). If a synchronized statements execution time is very long, it will block other synchronized statements, this causes reduced performance in the TaskSchedulerImpl instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r97211797 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -602,6 +619,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(false) } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + // abort TaskSetManager without exception + def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = { + scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { +taskSetMgr.abort(msg, exception) --- End diff -- `taskSetMgr.abort` is thread safety, It looks fine from the calling code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96108161 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, --- End diff -- Yes, it's a big issue. We can first verify how much performance is lost by we first have to create all the serialized tasks to make sure they all work. Maybe no performance degradation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96106896 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) +null } -else { + +if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + +"spark.rpc.message.maxSize (%d bytes). Consider increasing " + +"spark.rpc.message.maxSize or using broadcast variables for large values." + abortTaskSetManager(scheduler, task.taskId, +msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +} else if (serializedTask != null) { + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { --- End diff -- There are two cases when this nested if can be combined into the else if. 1. The following code to appear twice ``` scala val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + s" hostname: ${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) ``` 2. Use the return value of the if statement to avoid code duplication ```scala val launchTask = if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { false } else if (serializedTask != null && serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { true } else { true } if (launchTask) { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + s" hostname: ${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } ``` The existing code is more concise than the above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96104686 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- @squito squito@389fec5 I think is appropriate. Can I merge your commit (squito@389fec5) into this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95933009 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- Another implementation: https://github.com/witgo/spark/commit/4fbf30a568ed61982e17757f9df3c35cb9d64871 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95753220 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- How about this? ``` scala private[spark] class TaskDescription( val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, val index: Int,// Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, private var serializedTask_ : ByteBuffer) extends Logging { def this( taskId: Long, attemptNumber: Int, executorId: String, name: String, index: Int, // Index within this task's TaskSet addedFiles: Map[String, Long], addedJars: Map[String, Long], properties: Properties, task: Task[_]) { this(taskId, attemptNumber, executorId, name, index, addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer]) task_ = task } private var task_ : Task[_] = null private def serializedTask: ByteBuffer = { if (serializedTask_ == null) { // This is where we serialize the task on the driver before sending it to the executor. // This is not done when creating the TaskDescription so we can postpone this serialization // to later in the scheduling process -- particularly, // so it can happen in another thread by the CoarseGrainedSchedulerBackend. // On the executors, this will already be populated by decode serializedTask_ = try { ByteBuffer.wrap(Utils.serialize(task_)) } catch { case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) throw new TaskNotSerializableException(e) } } serializedTask_ } def getTask[_](loader: ClassLoader): Task[_] = { if (task_ == null) { task_ = Utils.deserialize(serializedTask, loader).asInstanceOf[Task[_]] } return task_ } override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95732717 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -245,6 +245,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) +if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { --- End diff -- The size of serializedTask exceeds `maxRpcMessageSize` will cause RPC can not send it to executor, but the size of serializedTask exceeds `TaskSetManager.TASK_SIZE_TO_WARN_KB` does not cause the problem. This code is added to keep consistent with the previous code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95731120 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("unserializable partition") { +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new Partitioner { + override def numPartitions = 1 + + override def getPartition(key: Any) = 1 + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = { +throw new NotSerializableException() + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = {} +}) + +// Submit a map stage by itself +submitMapStage(shuffleDep) +assert(failure.getMessage.startsWith( + "Job aborted due to stage failure: Task not serializable")) +sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) +assert(sparkListener.failedStages.contains(0)) +assert(sparkListener.failedStages.size === 1) +assertDataStructuresEmpty() --- End diff -- As above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95731056 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("unserializable partition") { --- End diff -- The test "unserializable task" in 507 line only verify the task. This test should be retained, I will add a new test in CoarseGrainedSchedulerBackendSuite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95723533 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { --- End diff -- @kayousterhout I need some time to write the code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95492638 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, // Index within this task's TaskSet + addedFiles: Map[String, Long], + addedJars: Map[String, Long], + properties: Properties, + task: Task[_]) { +this(taskId, attemptNumber, executorId, name, index, + addedFiles, addedJars, properties, null, task) + } + + lazy val serializedTask: ByteBuffer = { +if (serializedTask_ == null) { + serializedTask_ = try { +ByteBuffer.wrap(Utils.serialize(task_)) + } catch { +case NonFatal(e) => + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + throw new TaskNotSerializableException(e) + } +} +serializedTask_ --- End diff -- Ok, I agree with your, I will modify the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95305125 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) } - test("do not emit warning when serialized task is small") { --- End diff -- Ok, I will add this test case back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @squito In the local mode, the performance is relatively less important, we only guarantee that there will be no performance degradation on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org