Modified: trunk/Source/WebKit2/ChangeLog (139513 => 139514)
--- trunk/Source/WebKit2/ChangeLog 2013-01-11 23:54:56 UTC (rev 139513)
+++ trunk/Source/WebKit2/ChangeLog 2013-01-12 00:01:56 UTC (rev 139514)
@@ -1,3 +1,29 @@
+2013-01-11 Alexey Proskuryakov <a...@apple.com>
+
+ [WK2] Make it possible to send sync messages from secondary threads
+ https://bugs.webkit.org/show_bug.cgi?id=106708
+
+ Reviewed by Anders Carlsson.
+
+ It is hugely beneficial to implement sync messages at Connection level, because
+ ad hoc code that blocks a thread and wakes it up when a reply arrives on main
+ thread can't be made equally performant. A CoreOPC MessageDecoder can be moved across
+ threads, which can't be done with a decoded argument passed by reference to client code.
+
+ Sync messages from secondary threads are tracked in much simpler data structure
+ than client thread ones, because we don't need to be concerned with incoming messages.
+
+ * Platform/CoreIPC/Connection.cpp:
+ (Connection::SecondaryThreadPendingSyncReply):
+ (CoreIPC::Connection::SecondaryThreadPendingSyncReply::SecondaryThreadPendingSyncReply):
+ (CoreIPC::Connection::createSyncMessageEncoder):
+ (CoreIPC::Connection::sendSyncMessage):
+ (CoreIPC::Connection::sendSyncMessageFromSecondaryThread):
+ (CoreIPC::Connection::processIncomingSyncReply):
+ (CoreIPC::Connection::connectionDidClose):
+
+ * Platform/CoreIPC/Connection.h: Also corrected a misleading comment.
+
2013-01-11 Dan Bernstein <m...@apple.com>
Exclude unused resources.
Modified: trunk/Source/WebKit2/Platform/CoreIPC/Connection.cpp (139513 => 139514)
--- trunk/Source/WebKit2/Platform/CoreIPC/Connection.cpp 2013-01-11 23:54:56 UTC (rev 139513)
+++ trunk/Source/WebKit2/Platform/CoreIPC/Connection.cpp 2013-01-12 00:01:56 UTC (rev 139514)
@@ -93,6 +93,18 @@
Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
};
+class Connection::SecondaryThreadPendingSyncReply {
+WTF_MAKE_NONCOPYABLE(SecondaryThreadPendingSyncReply);
+public:
+ SecondaryThreadPendingSyncReply() : replyDecoder(0) { }
+
+ // The reply decoder, will be null if there was an error processing the sync message on the other side.
+ MessageDecoder* replyDecoder;
+
+ BinarySemaphore semaphore;
+};
+
+
PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
{
MutexLocker locker(syncMessageStateMapMutex());
@@ -297,7 +309,8 @@
OwnPtr<MessageEncoder> encoder = MessageEncoder::create(messageReceiverName, messageName, destinationID);
// Encode the sync request ID.
- syncRequestID = ++m_syncRequestID;
+ COMPILE_ASSERT(sizeof(m_syncRequestID) == sizeof(int64_t), CanUseAtomicIncrement);
+ syncRequestID = atomicIncrement(reinterpret_cast<int64_t volatile*>(&m_syncRequestID));
encoder->encode(syncRequestID);
return encoder.release();
@@ -386,8 +399,11 @@
PassOwnPtr<MessageDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<MessageEncoder> encoder, double timeout, unsigned syncSendFlags)
{
- // We only allow sending sync messages from the client run loop.
- ASSERT(RunLoop::current() == m_clientRunLoop);
+ if (RunLoop::current() != m_clientRunLoop) {
+ // No flags are supported for synchronous messages sent from secondary threads.
+ ASSERT(!syncSendFlags);
+ return sendSyncMessageFromSecondaryThread(messageID, syncRequestID, encoder, timeout);
+ }
if (!isValid()) {
didFailToSendSyncMessage();
@@ -420,12 +436,55 @@
m_pendingSyncReplies.removeLast();
}
- if (!reply)
- didFailToSendSyncMessage();
+ // FIXME: Should we call didFailToSendSyncMessage()? It may be unexpected to get in on a background thread.
return reply.release();
}
+PassOwnPtr<MessageDecoder> Connection::sendSyncMessageFromSecondaryThread(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<MessageEncoder> encoder, double timeout)
+{
+ ASSERT(RunLoop::current() != m_clientRunLoop);
+
+ if (!isValid()) {
+ didFailToSendSyncMessage();
+ return nullptr;
+ }
+
+ SecondaryThreadPendingSyncReply pendingReply;
+
+ // Push the pending sync reply information on our stack.
+ {
+ MutexLocker locker(m_syncReplyStateMutex);
+ if (!m_shouldWaitForSyncReplies) {
+ didFailToSendSyncMessage();
+ return nullptr;
+ }
+
+ ASSERT(!m_secondaryThreadPendingSyncReplyMap.contains(syncRequestID));
+ m_secondaryThreadPendingSyncReplyMap.add(syncRequestID, &pendingReply);
+ }
+
+ sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, 0);
+
+ // Use a really long timeout.
+ if (timeout == NoTimeout)
+ timeout = 1e10;
+
+ pendingReply.semaphore.wait(timeout);
+
+ // Finally, pop the pending sync reply information.
+ {
+ MutexLocker locker(m_syncReplyStateMutex);
+ ASSERT(m_secondaryThreadPendingSyncReplyMap.contains(syncRequestID));
+ m_secondaryThreadPendingSyncReplyMap.remove(syncRequestID);
+ }
+
+ if (!pendingReply.replyDecoder)
+ didFailToSendSyncMessage();
+
+ return adoptPtr(pendingReply.replyDecoder);
+}
+
PassOwnPtr<MessageDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout, unsigned syncSendFlags)
{
// Use a really long timeout.
@@ -502,7 +561,16 @@
return;
}
- // If we get here, it means we got a reply for a message that wasn't in the sync request stack.
+ // If it's not a reply to any primary thread message, check if it is a reply to a secondary thread one.
+ SecondaryThreadPendingSyncReplyMap::iterator secondaryThreadReplyMapItem = m_secondaryThreadPendingSyncReplyMap.find(decoder->destinationID());
+ if (secondaryThreadReplyMapItem != m_secondaryThreadPendingSyncReplyMap.end()) {
+ SecondaryThreadPendingSyncReply* reply = secondaryThreadReplyMapItem->value;
+ ASSERT(!reply->replyDecoder);
+ reply->replyDecoder = decoder.leakPtr();
+ reply->semaphore.signal();
+ }
+
+ // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
// This can happen if the send timed out, so it's fine to ignore.
}
@@ -570,6 +638,9 @@
if (!m_pendingSyncReplies.isEmpty())
m_syncMessageState->wakeUpClientRunLoop();
+
+ for (SecondaryThreadPendingSyncReplyMap::iterator iter = m_secondaryThreadPendingSyncReplyMap.begin(); iter != m_secondaryThreadPendingSyncReplyMap.end(); ++iter)
+ iter->value->semaphore.signal();
}
if (m_didCloseOnConnectionWorkQueueCallback)
Modified: trunk/Source/WebKit2/Platform/CoreIPC/Connection.h (139513 => 139514)
--- trunk/Source/WebKit2/Platform/CoreIPC/Connection.h 2013-01-11 23:54:56 UTC (rev 139513)
+++ trunk/Source/WebKit2/Platform/CoreIPC/Connection.h 2013-01-12 00:01:56 UTC (rev 139514)
@@ -182,6 +182,7 @@
PassOwnPtr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID);
bool sendMessage(MessageID, PassOwnPtr<MessageEncoder>, unsigned messageSendFlags = 0);
PassOwnPtr<MessageDecoder> sendSyncMessage(MessageID, uint64_t syncRequestID, PassOwnPtr<MessageEncoder>, double timeout, unsigned syncSendFlags = 0);
+ PassOwnPtr<MessageDecoder> sendSyncMessageFromSecondaryThread(MessageID, uint64_t syncRequestID, PassOwnPtr<MessageEncoder>, double timeout);
bool sendSyncReply(PassOwnPtr<MessageEncoder>);
void wakeUpRunLoop();
@@ -305,7 +306,7 @@
// message on the other side.
MessageDecoder* replyDecoder;
- // Will be set to true once a reply has been received or an error occurred.
+ // Will be set to true once a reply has been received.
bool didReceiveReply;
PendingSyncReply()
@@ -330,7 +331,7 @@
return reply.release();
}
};
-
+
class SyncMessageState;
friend class SyncMessageState;
RefPtr<SyncMessageState> m_syncMessageState;
@@ -339,6 +340,10 @@
bool m_shouldWaitForSyncReplies;
Vector<PendingSyncReply> m_pendingSyncReplies;
+ class SecondaryThreadPendingSyncReply;
+ typedef HashMap<uint64_t, SecondaryThreadPendingSyncReply*> SecondaryThreadPendingSyncReplyMap;
+ SecondaryThreadPendingSyncReplyMap m_secondaryThreadPendingSyncReplyMap;
+
#if OS(DARWIN)
// Called on the connection queue.
void receiveSourceEventHandler();