[ https://issues.apache.org/jira/browse/MESOS-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexander Rojas updated MESOS-3705: ----------------------------------- Description: [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made. Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival. Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}. {{processA}} is of type {{ProcessA}} which looks roughly as follows: {code} class ProcessA : public ProcessBase<ProcessA> { public: ProcessA() {} Future<http::Response> foo(const http::Request&) { // … Do something … return http::Ok(); } protected: virtual void initialize() { route("/foo", None(), &ProcessA::foo); } } {code} {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}. The situation in which the bug arises is the following: # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket. # The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}. # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}. # The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}. # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}. # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue. # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately. # [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came. At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done. h1. Reproducer The following is a test which successfully reproduces the issue: {code:title=3rdparty/libprocess/src/tests/http_tests.cpp} #include <process/latch.hpp using process::Latch; using testing::InvokeWithoutArgs; // This tests tries to force a situation in which HTTP Pipelining is scrambled. // It does so by having two actors to which three requests are made, the first // two requests to the first actor and a third request to the second actor. // The first request will block the first actor long enough to allow the second // actor to process the third request. Since the first actor will not be able to // handle any event until it is done processing the first request, the third // request is finished before the second even starts. // The ultimate goal of the test is to alter the order in which // `ProcessBase::visit(HttpEvent)` is executed for the different events // respect to the order in which the requests arrived. TEST(HTTPConnectionTest, ComplexPipelining) { Http server1, server2; Future<http::Request> get1, get2, get3; Latch latch; EXPECT_CALL(*server1.process, get(_)) .WillOnce(DoAll(FutureArg<0>(&get1), InvokeWithoutArgs([&latch]() { latch.await(); }), Return(http::OK("1")))) .WillOnce(DoAll(FutureArg<0>(&get2), Return(http::OK("2")))); EXPECT_CALL(*server2.process, get(_)) .WillOnce(DoAll(FutureArg<0>(&get3), Return(http::OK("3")))); auto url1 = http::URL( "http", server1.process->self().address.ip, server1.process->self().address.port, server1.process->self().id + "/get"); auto url2 = http::URL( "http", server1.process->self().address.ip, server1.process->self().address.port, server2.process->self().id + "/get"); // Create a connection to the server for HTTP pipelining. Future<http::Connection> connect = http::connect(url1); AWAIT_READY(connect); http::Connection connection = connect.get(); http::Request request1; request1.method = "GET"; request1.url = url1; request1.keepAlive = true; request1.body = "1"; Future<http::Response> response1 = connection.send(request1); http::Request request2 = request1; request2.body = "2"; Future<http::Response> response2 = connection.send(request2); http::Request request3; request3.method = "GET"; request3.url = url2; request3.keepAlive = true; request3.body = "3"; Future<http::Response> response3 = connection.send(request3); // Verify that request1 arrived at server1 and it is the right request. // Now server1 is blocked processing request1 and cannot pick up more events // in the queue. AWAIT_READY(get1); EXPECT_EQ(request1.body, get1->body); // Verify that request3 arrived at server2 and it is the right request. AWAIT_READY(get3); EXPECT_EQ(request3.body, get3->body); // Request2 hasn't been picked up since server1 is still blocked serving // request1. EXPECT_TRUE(get2.isPending()); // Free server1 so it can serve request2. latch.trigger(); // Verify that request2 arrived at server1 and it is the right request. AWAIT_READY(get2); EXPECT_EQ(request2.body, get2->body); // Wait for all responses. AWAIT_READY(response1); AWAIT_READY(response2); AWAIT_READY(response3); // If pipelining works as expected, even though server2 finished processing // its request before server1 even began with request2, the responses should // arrive in the order they were made. EXPECT_EQ(request1.body, response1->body); EXPECT_EQ(request2.body, response2->body); EXPECT_EQ(request3.body, response3->body); AWAIT_READY(connection.disconnect()); AWAIT_READY(connection.disconnected()); } {code} was: [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made. Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival. Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}. {{processA}} is of type {{ProcessA}} which looks roughly as follows: {code} class ProcessA : public ProcessBase<ProcessA> { public: ProcessA() {} Future<http::Response> foo(const http::Request&) { // … Do something … return http::Ok(); } protected: virtual void initialize() { route("/foo", None(), &ProcessA::foo); } } {code} {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}. The situation in which the bug arises is the following: # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket. # The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}. # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}. # The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}. # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}. # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue. # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately. # [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came. At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done. h1. Reproducer The following is a test which successfully reproduces the issue: {code} class PipelineScramblerProcess : public Process<PipelineScramblerProcess> { public: PipelineScramblerProcess() : ProcessBase(ID::generate("PipelineScramblerProcess")) {} void block(const Future<Nothing>& trigger) { trigger.await(); } Future<http::Response> get(const http::Request& request) { if (promise_) { promise_->set(Nothing()); } return http::OK(self().id); } void setPromise(std::unique_ptr<Promise<Nothing>>& promise) { promise_ = std::move(promise); } protected: virtual void initialize() { route("/get", None(), &PipelineScramblerProcess::get); } private: std::unique_ptr<Promise<Nothing>> promise_; }; TEST(HTTPConnectionTest, ComplexPipelining) { PipelineScramblerProcess blocked; spawn(blocked); PipelineScramblerProcess unblocked; spawn(unblocked); ASSERT_EQ(blocked.self().address.ip, unblocked.self().address.ip); ASSERT_EQ(blocked.self().address.port, unblocked.self().address.port); std::unique_ptr<Promise<Nothing>> promise(new Promise<Nothing>()); // Block the first process so it cannot process the first request until // the second request is finished. dispatch(blocked, &PipelineScramblerProcess::block, promise->future()); // Promise will be set once 'fast' serves the second request. unblocked.setPromise(promise); // Get connection for HTTP pipelining. Future<http::Connection> connect = http::connect(http::URL( "http", blocked.self().address.ip, blocked.self().address.port)); AWAIT_READY(connect); http::Connection connection = connect.get(); http::Request blockedRequest; blockedRequest.method = "GET"; blockedRequest.url = http::URL( "http", blocked.self().address.ip, blocked.self().address.port, blocked.self().id + "/get"); blockedRequest.keepAlive = true; Future<http::Response> blockedResponse = connection.send(blockedRequest); http::Request unblockedRequest; unblockedRequest.method = "GET"; unblockedRequest.url = http::URL( "http", unblocked.self().address.ip, unblocked.self().address.port, unblocked.self().id + "/get"); unblockedRequest.keepAlive = true; Future<http::Response> unblockedResponse = connection.send(unblockedRequest); AWAIT_READY(blockedResponse); AWAIT_READY(unblockedResponse); EXPECT_EQ(blocked.self().id, blockedResponse->body); EXPECT_EQ(unblocked.self().id, unblockedResponse->body); AWAIT_READY(connection.disconnect()); AWAIT_READY(connection.disconnected()); terminate(blocked); wait(blocked); terminate(unblocked); wait(unblocked); } {code} > HTTP Pipelining doesn't keep order of requests > ---------------------------------------------- > > Key: MESOS-3705 > URL: https://issues.apache.org/jira/browse/MESOS-3705 > Project: Mesos > Issue Type: Bug > Components: libprocess > Affects Versions: 0.24.0 > Reporter: Alexander Rojas > Assignee: Alexander Rojas > Labels: http, libprocess, mesosphere > > [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes > a mechanism by which multiple HTTP request can be performed over a single > socket. The requirement here is that responses should be send in the same > order as requests are being made. > Libprocess has some mechanisms built in to deal with pipelining when multiple > HTTP requests are made, it is still, however, possible to create a situation > in which responses are scrambled respected to the requests arrival. > Consider the situation in which there are two libprocess processes, > {{processA}} and {{processB}}, each running in a different thread, > {{thread2}} and {{thread3}} respectively. The > [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] > runs in {{thread1}}. > {{processA}} is of type {{ProcessA}} which looks roughly as follows: > {code} > class ProcessA : public ProcessBase<ProcessA> > { > public: > ProcessA() {} > Future<http::Response> foo(const http::Request&) { > // … Do something … > return http::Ok(); > } > protected: > virtual void initialize() { > route("/foo", None(), &ProcessA::foo); > } > } > {code} > {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but > routes {{"bar"}} instead of {{"foo"}}. > The situation in which the bug arises is the following: > # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for > {{"http://server_uri/(2)//bar"}} are made over the same socket. > # The first request arrives to > [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] > which is still running in {{thread1}}. This one creates an {{HttpEvent}} and > delivers to the handler, in this case {{processA}}. > # > [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] > enqueues the HTTP event in to the {{processA}} queue. This happens in > {{thread1}}. > # The second request arrives to > [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] > which is still running in {{thread1}}. Another {{HttpEvent}} is created and > delivered to the handler, in this case {{processB}}. > # > [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] > enqueues the HTTP event in to the {{processB}} queue. This happens in > {{thread1}}. > # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it > is stuck in the queue. > # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately. > # > [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] > is called in {{thread3}}, this one in turn > [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] > the response's future to the {{HttpProxy}} associated with the socket where > the request came. > At the last point, the bug is evident, the request to {{processB}} will be > send before the request to {{processA}} even if the handler takes a long time > and the {{processA::bar()}} actually finishes before. The responses are not > send in the order the requests are done. > h1. Reproducer > The following is a test which successfully reproduces the issue: > {code:title=3rdparty/libprocess/src/tests/http_tests.cpp} > #include <process/latch.hpp > using process::Latch; > using testing::InvokeWithoutArgs; > // This tests tries to force a situation in which HTTP Pipelining is > scrambled. > // It does so by having two actors to which three requests are made, the first > // two requests to the first actor and a third request to the second actor. > // The first request will block the first actor long enough to allow the > second > // actor to process the third request. Since the first actor will not be able > to > // handle any event until it is done processing the first request, the third > // request is finished before the second even starts. > // The ultimate goal of the test is to alter the order in which > // `ProcessBase::visit(HttpEvent)` is executed for the different events > // respect to the order in which the requests arrived. > TEST(HTTPConnectionTest, ComplexPipelining) > { > Http server1, server2; > Future<http::Request> get1, get2, get3; > Latch latch; > EXPECT_CALL(*server1.process, get(_)) > .WillOnce(DoAll(FutureArg<0>(&get1), > InvokeWithoutArgs([&latch]() { latch.await(); }), > Return(http::OK("1")))) > .WillOnce(DoAll(FutureArg<0>(&get2), > Return(http::OK("2")))); > EXPECT_CALL(*server2.process, get(_)) > .WillOnce(DoAll(FutureArg<0>(&get3), > Return(http::OK("3")))); > auto url1 = http::URL( > "http", > server1.process->self().address.ip, > server1.process->self().address.port, > server1.process->self().id + "/get"); > auto url2 = http::URL( > "http", > server1.process->self().address.ip, > server1.process->self().address.port, > server2.process->self().id + "/get"); > // Create a connection to the server for HTTP pipelining. > Future<http::Connection> connect = http::connect(url1); > AWAIT_READY(connect); > http::Connection connection = connect.get(); > http::Request request1; > request1.method = "GET"; > request1.url = url1; > request1.keepAlive = true; > request1.body = "1"; > Future<http::Response> response1 = connection.send(request1); > http::Request request2 = request1; > request2.body = "2"; > Future<http::Response> response2 = connection.send(request2); > http::Request request3; > request3.method = "GET"; > request3.url = url2; > request3.keepAlive = true; > request3.body = "3"; > Future<http::Response> response3 = connection.send(request3); > // Verify that request1 arrived at server1 and it is the right request. > // Now server1 is blocked processing request1 and cannot pick up more events > // in the queue. > AWAIT_READY(get1); > EXPECT_EQ(request1.body, get1->body); > // Verify that request3 arrived at server2 and it is the right request. > AWAIT_READY(get3); > EXPECT_EQ(request3.body, get3->body); > // Request2 hasn't been picked up since server1 is still blocked serving > // request1. > EXPECT_TRUE(get2.isPending()); > // Free server1 so it can serve request2. > latch.trigger(); > // Verify that request2 arrived at server1 and it is the right request. > AWAIT_READY(get2); > EXPECT_EQ(request2.body, get2->body); > // Wait for all responses. > AWAIT_READY(response1); > AWAIT_READY(response2); > AWAIT_READY(response3); > // If pipelining works as expected, even though server2 finished processing > // its request before server1 even began with request2, the responses should > // arrive in the order they were made. > EXPECT_EQ(request1.body, response1->body); > EXPECT_EQ(request2.body, response2->body); > EXPECT_EQ(request3.body, response3->body); > AWAIT_READY(connection.disconnect()); > AWAIT_READY(connection.disconnected()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)