Hi Shixiong:
Thanks for the reply. You are right. It seems it only supports the
following two types.
I will retry by adding FileRegion type.
protected long calculateSize(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}
2017-06-14 1:34 GMT+08:00 Shixiong(Ryan) Zhu <[email protected]>:
> I took a look at ChannelTrafficShapingHandler. Looks like it's because it
> doesn't support FileRegion. Spark's messages use this interface.
> See org.apache.spark.network.protocol.MessageWithHeader.
>
> On Tue, Jun 13, 2017 at 4:17 AM, Niu Zhaojie <[email protected]> wrote:
>
>> Hi All:
>>
>> I am trying to control the network read/write speed with
>> ChannelTrafficShapingHandler provided by Netty.
>>
>>
>> In TransportContext.java
>>
>> I modify it as below:
>>
>> public TransportChannelHandler initializePipeline(
>> SocketChannel channel,
>> RpcHandler channelRpcHandler) {
>> try {
>> // added by zhaojie
>> logger.info("want to try control read bandwidth on host: " + host);
>> final ChannelTrafficShapingHandler channelShaping = new
>> ChannelTrafficShapingHandler(50, 50, 1000);
>>
>> TransportChannelHandler channelHandler = createChannelHandler(channel,
>> channelRpcHandler);
>>
>> channel.pipeline()
>> .addLast("encoder", ENCODER)
>> .addLast(TransportFrameDecoder.HANDLER_NAME,
>> NettyUtils.createFrameDecoder())
>> .addLast("decoder", DECODER)
>> .addLast("channelTrafficShaping", channelShaping)
>> .addLast("idleStateHandler", new IdleStateHandler(0, 0,
>> conf.connectionTimeoutMs() / 1000))
>> // NOTE: Chunks are currently guaranteed to be returned in the
>> order of request, but this
>> // would require more logic to guarantee if this were not part
>> of the same event loop.
>> .addLast("handler", channelHandler);
>>
>>
>> I create a ChannelTrafficShapingHandler and register it into the
>> pipeline of the channel. I set the write and read speed as 50kb/sec in the
>> constructor.
>> Except for it, what else do I need to do?
>>
>> However, it does not work. Is this idea correct? Am I missing something?
>>
>> Is there any better way ?
>>
>> Thanks.
>>
>> --
>> *Regards,*
>> *Zhaojie*
>>
>>
>
--
*Regards,*
*Zhaojie*