RexXiong commented on code in PR #966:
URL:
https://github.com/apache/incubator-celeborn/pull/966#discussion_r1022226679
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -71,65 +71,36 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
override def receive(client: TransportClient, msg: RequestMessage): Unit =
msg match {
case pushData: PushData =>
- try {
- rpcSource.updateMessageMetrics(pushData, pushData.body().size())
- handlePushData(
- pushData,
- new RpcResponseCallback {
- override def onSuccess(response: ByteBuffer): Unit = {
- client.getChannel.writeAndFlush(new RpcResponse(
- pushData.requestId,
- new NioManagedBuffer(response)))
- }
-
- override def onFailure(e: Throwable): Unit = {
- logError(
- "[processPushData] Process pushData onFailure! ShuffleKey: "
- + pushData.shuffleKey + ", partitionUniqueId: " +
pushData.partitionUniqueId,
- e)
- client.getChannel.writeAndFlush(new
RpcFailure(pushData.requestId, e.getMessage))
- }
- })
- } catch {
- case e: Exception =>
- logError(s"Error while handlePushData $pushData", e)
- client.getChannel.writeAndFlush(new RpcFailure(
- pushData.requestId,
- Throwables.getStackTraceAsString(e)))
- } finally {
- pushData.body().release()
- }
- case pushMergedData: PushMergedData =>
- try {
- rpcSource.updateMessageMetrics(pushMergedData,
pushMergedData.body().size())
- handlePushMergedData(
- pushMergedData,
- new RpcResponseCallback {
- override def onSuccess(response: ByteBuffer): Unit = {
- client.getChannel.writeAndFlush(new RpcResponse(
- pushMergedData.requestId,
- new NioManagedBuffer(response)))
- }
-
- override def onFailure(e: Throwable): Unit = {
- logError(
- "[processPushMergedData] Process PushMergedData onFailure!
ShuffleKey: " +
- pushMergedData.shuffleKey +
- ", partitionUniqueId: " +
pushMergedData.partitionUniqueIds.mkString(","),
- e)
- client.getChannel.writeAndFlush(
- new RpcFailure(pushMergedData.requestId, e.getMessage()))
- }
- })
- } catch {
- case e: Exception =>
- logError(s"Error while handlePushMergedData $pushMergedData", e);
- client.getChannel.writeAndFlush(new RpcFailure(
- pushMergedData.requestId,
- Throwables.getStackTraceAsString(e)));
- } finally {
- pushMergedData.body().release()
- }
+ handleCore(
+ client,
+ "PushData",
Review Comment:
how about use RequestMessage.type instead of hardcode action as
"PushData/PushMergeData"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]