[ https://issues.apache.org/jira/browse/SPARK-13652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15178715#comment-15178715 ]
Shixiong Zhu edited comment on SPARK-13652 at 3/3/16 10:18 PM: --------------------------------------------------------------- [~huang_yuu] I tested my patch using your codes and it worked. Thanks for reporting it and nice reproducer. was (Author: zsxwing): [~huang_yuu] I tested my patch using your codes and it worked. Thanks for reporting it and new reproducer. > spark netty network issue > ------------------------- > > Key: SPARK-13652 > URL: https://issues.apache.org/jira/browse/SPARK-13652 > Project: Spark > Issue Type: Bug > Affects Versions: 1.5.1, 1.5.2, 1.6.0 > Reporter: huangyu > Attachments: RankHandler.java, Test.java > > > TransportClient is not thread safe and if it is called from multiple threads, > the messages can't be encoded and decoded correctly. Below is my code,and it > will print wrong message. > {code} > public static void main(String[] args) throws IOException, > InterruptedException { > TransportServer server = new TransportContext(new > TransportConf("test", > new MapConfigProvider(new HashMap<String, String>())), new > RankHandler()). > createServer(8081, new > LinkedList<TransportServerBootstrap>()); > TransportContext context = new TransportContext(new > TransportConf("test", > new MapConfigProvider(new HashMap<String, String>())), new > NoOpRpcHandler(), true); > final TransportClientFactory clientFactory = > context.createClientFactory(); > List<Thread> ts = new ArrayList<>(); > for (int i = 0; i < 10; i++) { > ts.add(new Thread(new Runnable() { > @Override > public void run() { > for (int j = 0; j < 1000; j++) { > try { > ByteBuf buf = Unpooled.buffer(8); > buf.writeLong((long) j); > ByteBuffer byteBuffer = > clientFactory.createClient("localhost", 8081). > sendRpcSync(buf.nioBuffer(), > Long.MAX_VALUE); > long response = byteBuffer.getLong(); > if (response != j) { > System.err.println("send:" + j + ",response:" > + response); > } > } catch (IOException e) { > e.printStackTrace(); > } > } > } > })); > ts.get(i).start(); > } > for (Thread t : ts) { > t.join(); > } > server.close(); > } > public class RankHandler extends RpcHandler { > private final Logger logger = LoggerFactory.getLogger(RankHandler.class); > private final StreamManager streamManager; > public RankHandler() { > this.streamManager = new OneForOneStreamManager(); > } > @Override > public void receive(TransportClient client, ByteBuffer msg, > RpcResponseCallback callback) { > callback.onSuccess(msg); > } > @Override > public StreamManager getStreamManager() { > return streamManager; > } > } > {code} > it will print as below > send:221,response:222 > send:233,response:234 > send:312,response:313 > send:358,response:359 > ... -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org