On Sun, 2009-09-06 at 20:07 +0200, Loic Dachary wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > Hi, > > With the introduction of the patch you implemented last friday ( long > poll 3/5 : all requests for a session are handled by the same server > ), the PokerAvatar instance that receives packets can delegate the > handling of packets to the server responsible for it. For instance if > the table T is on server S1 and the PokerAvatar receiving the request > (responsible for the session) is on server S2, the request will have > to be sent from S2 to S1. When S1 answers, S2 will take the packets > and add them to the PokerAvatar queue, therefore acting as a reverse > proxy. > r > The idea is to enable PokerExplain on the S2 server (responsible for > the session) and not on S1. Activating PokerExplain on both would lead > to chaos anyway :-) >
Hi, I added tests and modified your draft implementation, feel free to review the attached patch. Affected module are: pokeravatar, pokerservice, pokersite, and all the new code is covered by the tests. Please note that I was not able to made the modifications your raised on IRC in time: remark #1 (12:12:07 PM) dachary: def handlePacketDefer(self, packet): (12:12:20 PM) dachary: should return the long poll deferred remark #2 (12:20:19 PM) dachary: I moved LONG_POLL handling to handlePacketDeferred instead of handlePacketLogic remark #3 (12:53:09 PM) dachary: means the PokerRestClient API is wrong again (12:55:15 PM) dachary: when a longpoll returns it calls a callback that will add the packets to the avatar I also suspect there could be timing issues with my previous patch: poker-network-pokerrestclient-deferred-api.patch Sorry for the inconvenience, I'll be able to provide support and answers your questions on these patches tomorrow. -- Johan Euphrosine <[email protected]>
Index: pokernetwork/pokerservice.py
===================================================================
--- pokernetwork/pokerservice.py (revision 6210)
+++ pokernetwork/pokerservice.py (working copy)
@@ -1725,6 +1725,11 @@
cursor.close()
def packet2resthost(self, packet):
+ #
+ # game_id is only set for packets related to a table and not for
+ # packets that are delegated but related to tournaments.
+ #
+ game_id = None
cursor = self.db.cursor()
if packet.type == PACKET_POKER_CREATE_TOURNEY:
#
@@ -1735,31 +1740,31 @@
self.message("packet2resthost: create tourney " + sql)
cursor.execute(sql)
else:
- if packet.type == PACKET_POKER_POLL:
- wheres = []
- if packet.game_id > 0:
- wheres.append("table_serial = " + str(packet.game_id))
- if packet.tourney_serial > 0:
- wheres.append(" tourney_serial = " + str(packet.tourney_serial))
- where = " AND ".join(wheres)
- elif packet.type in ( PACKET_POKER_TOURNEY_REQUEST_PLAYERS_LIST, PACKET_POKER_TOURNEY_REGISTER, PACKET_POKER_TOURNEY_UNREGISTER ):
+ if packet.type in ( PACKET_POKER_TOURNEY_REQUEST_PLAYERS_LIST, PACKET_POKER_TOURNEY_REGISTER, PACKET_POKER_TOURNEY_UNREGISTER ):
where = "tourney_serial = " + str(packet.game_id)
elif packet.type in ( PACKET_POKER_GET_TOURNEY_MANAGER, ):
where = "tourney_serial = " + str(packet.tourney_serial)
elif hasattr(packet, "game_id"):
where = "table_serial = " + str(packet.game_id)
+ game_id = packet.game_id
else:
cursor.close()
- return None
+ return (None, None)
- cursor.execute("SELECT host, port, path FROM route,resthost WHERE " +
- " route.resthost_serial = resthost.serial AND " + where)
+ cursor.execute("SELECT host, port, path FROM route,resthost " +
+ " WHERE route.resthost_serial = resthost.serial " +
+ #
+ # exclude the routes that go to this server
+ #
+ " AND resthost.serial != " + str(self.resthost_serial) +
+ " AND " + where)
+ print cursor._last_executed
if cursor.rowcount > 0:
result = cursor.fetchone()
else:
result = None
cursor.close()
- return result
+ return ( result, game_id )
def cleanUp(self, temporary_users = ''):
cursor = self.db.cursor()
Index: pokernetwork/pokersite.py
===================================================================
--- pokernetwork/pokersite.py (revision 6213)
+++ pokernetwork/pokersite.py (working copy)
@@ -226,18 +226,18 @@
#
return True
- deferred.addCallback(lambda result: self.deferRender(request, jsonp, packet))
+ deferred.addCallback(lambda result: self.deferRender(request, jsonp, packet, data))
deferred.addErrback(pipesFailed)
return server.NOT_DONE_YET
- def deferRender(self, request, jsonp, packet):
+ def deferRender(self, request, jsonp, packet, data):
if request.finished:
#
# For instance : the request was reverse-proxied to a server.
#
return True
session = request.getSession()
- d = defer.maybeDeferred(session.avatar.handlePacketDefer, packet)
+ d = defer.maybeDeferred(session.avatar.handleDistributedPacket, request, packet, data)
def render(packets):
#
# update the session information if the avatar changed
Index: pokernetwork/pokeravatar.py
===================================================================
--- pokernetwork/pokeravatar.py (revision 6210)
+++ pokernetwork/pokeravatar.py (working copy)
@@ -42,6 +42,7 @@
from pokernetwork.user import User, checkNameAndPassword
from pokernetwork.pokerpackets import *
from pokernetwork.pokerexplain import PokerExplain
+from pokernetwork.pokerrestclient import PokerRestClient
from twisted.internet import protocol, reactor, defer
DEFAULT_PLAYER_USER_DATA = { 'ready': True }
@@ -62,6 +63,9 @@
self.has_session = False
self.bugous_processing_hand = False
self.noqueuePackets()
+ self._block_longpoll_deferred = False
+ self._longpoll_deferred = None
+ self.game_id2rest_client = {}
def __str__(self):
return "PokerAvatar serial = %s, name = %s" % ( self.getSerial(), self.getName() )
@@ -247,6 +251,7 @@
# This method was introduced when we added the force-disconnect as
# the stop-gap.
self._packets_queue.extend(newPackets)
+ self.flushLongPollDeferred()
warnVal = int(.75 * self.service.getClientQueuedPacketMax())
if len(self._packets_queue) >= warnVal:
# If we have not warned yet that packet queue is getting long, warn now.
@@ -339,6 +344,69 @@
else:
return False
+ def longpollDeferred(self):
+ self._longpoll_deferred = defer.Deferred()
+ self.flushLongPollDeferred()
+
+ def blockLongPollDeferred(self):
+ self._block_longpoll_deferred = True
+
+ def unblockLongPollDeferred(self):
+ self._block_longpoll_deferred = False
+ self.flushLongPollDeferred()
+
+ def flushLongPollDeferred(self):
+ if self._block_longpoll_deferred == False and self._longpoll_deferred and len(self._packets_queue) > 0:
+ packets = self.resetPacketsQueue()
+ d = self._longpoll_deferred
+ self._longpoll_deferred = None
+ d.callback(packets)
+
+ def handleDistributedPacket(self, request, packet, data):
+ resthost, game_id = self.service.packet2resthost(packet)
+ if resthost:
+ return self.distributePacket(packet, data, resthost, game_id)
+ else:
+ return self.handlePacketDefer(packet)
+
+ def getOrCreateRestClient(self, resthost, game_id):
+ #
+ # no game_id means the request must be delegated for tournament
+ # registration or creation. Not for table interaction.
+ #
+ ( host, port, path ) = resthost
+ if game_id:
+ if not self.game_id2rest_client.has_key(game_id):
+ self.game_id2rest_client[game_id] = PokerRestClient(host, port, path, self.service.verbose)
+ client = self.game_id2rest_client[game_id]
+ else:
+ client = PokerRestClient(host, port, path, self.service.verbose)
+ return client
+
+ def distributePacket(self, packet, data, resthost, game_id):
+ ( host, port, path ) = resthost
+ client = self.getOrCreateRestClient(resthost, game_id)
+ d = client.sendPacket(packet, data)
+ d.addCallback(lambda packets: self.incomingDistributedPackets(packets, game_id))
+ return d
+
+ def incomingDistributedPackets(self, packets, game_id):
+ if game_id:
+ if game_id not in self.tables:
+ #
+ # discard client if nothing pending and not in the list
+ # of active tables
+ #
+ client = self.game_id2rest_client[game_id]
+ if ( len(client.queue.callbacks) <= 0 or
+ client.pendingLongPoll ):
+ del self.game_id2rest_client[game_id]
+ self.blockLongPollDeferred()
+ for packet in packets:
+ self.sendPacket(packet)
+ self.unblockLongPollDeferred()
+ return self.resetPacketsQueue()
+
def handlePacketDefer(self, packet):
self.queuePackets()
self.handlePacketLogic(packet)
@@ -367,7 +435,14 @@
def handlePacketLogic(self, packet):
if self.service.verbose > 2 and packet.type != PACKET_PING:
self.message("handlePacketLogic(%d): " % self.getSerial() + str(packet))
-
+ if packet.type == PACKET_POKER_LONG_POLL:
+ self.longpollDeferred()
+ return
+
+ if packet.type == PACKET_POKER_LONG_POLL_RETURN:
+ self.flushLongPollDeferred()
+ return
+
if packet.type == PACKET_POKER_EXPLAIN:
if self.setExplain(packet.value):
self.sendPacketVerbose(PacketAck())
Index: tests/test-pokeravatar.py.in
===================================================================
--- tests/test-pokeravatar.py.in (revision 6210)
+++ tests/test-pokeravatar.py.in (working copy)
@@ -57,6 +57,7 @@
currencyclient.CurrencyClient = currencyclient.FakeCurrencyClient
from pokernetwork.pokerclientpackets import *
from pokernetwork.pokertable import PokerAvatarCollection
+from pokernetwork.pokerrestclient import PokerRestClient
class ConstantDeckShuffler:
def shuffle(self, what):
@@ -3893,6 +3894,121 @@
d.addCallback(self.tourneyCreate, 0)
return d
+ PokerRestClient.longPoll = lambda arg: arg
+ PokerRestClient.deferred = None
+ PokerRestClient.sendPacket = lambda client, packet, data: PokerRestClient.deferred
+
+ # ------------------------------------------------------------------------
+ def test81_distributePacket(self):
+ self.createClients(1)
+ d = self.client_factory[0].established_deferred
+ d.addCallback(self.setupCallbackChain)
+ d.addCallback(self.login, 0)
+ d.addCallback(self.joinTable, 0, 2, 'Table2', '2-4-limit')
+ def handleDistributedPacket(x):
+ avatar = self.service.avatars[0]
+ self.service.packet2resthost = lambda packet: (('host', 11111, '/PATH'), 2)
+ d = defer.Deferred()
+ PokerRestClient.deferred = d
+ r = avatar.handleDistributedPacket(None, PacketPing(), '{ "type": "PacketPing" }')
+ r.addCallback(lambda packets: self.assertEquals(['foo'], packets))
+ r.addCallback(lambda arg: self.assertEquals(True, avatar.game_id2rest_client.has_key(2)))
+ d.callback(['foo'])
+ return d
+ d.addCallback(handleDistributedPacket)
+ return d
+ # ------------------------------------------------------------------------
+ def test82_distributePacketNoMoreActiveTable(self):
+ self.createClients(1)
+ d = self.client_factory[0].established_deferred
+ d.addCallback(self.setupCallbackChain)
+ d.addCallback(self.login, 0)
+ d.addCallback(self.joinTable, 0, 2, 'Table2', '2-4-limit')
+ def handleDistributedPacket(x):
+ avatar = self.service.avatars[0]
+ self.service.packet2resthost = lambda packet: (('host', 11111, '/PATH'), 2)
+ d = defer.Deferred()
+ def clearActiveTable(arg):
+ avatar.tables = {}
+ return arg
+ d.addCallback(clearActiveTable)
+ PokerRestClient.deferred = d
+ r = avatar.handleDistributedPacket(None, PacketPing(), '{ "type": "PacketPing" }')
+ r.addCallback(lambda packets: self.assertEquals(['foo'], packets))
+ r.addCallback(lambda arg: self.assertEquals({}, avatar.game_id2rest_client))
+ d.callback(['foo'])
+ return d
+ d.addCallback(handleDistributedPacket)
+ return d
+ # ------------------------------------------------------------------------
+ def test83_distributePacketNoGameId(self):
+ self.createClients(1)
+ d = self.client_factory[0].established_deferred
+ d.addCallback(self.setupCallbackChain)
+ d.addCallback(self.login, 0)
+ d.addCallback(self.joinTable, 0, 2, 'Table2', '2-4-limit')
+ def handleDistributedPacket(x):
+ avatar = self.service.avatars[0]
+ self.service.packet2resthost = lambda packet: (('host', 11111, '/PATH'), None)
+ d = defer.Deferred()
+ PokerRestClient.deferred = d
+ r = avatar.handleDistributedPacket(None, PacketPing(), '{ "type": "PacketPing" }')
+ r.addCallback(lambda packets: self.assertEquals(['foo'], packets))
+ r.addCallback(lambda arg: self.assertEquals({}, avatar.game_id2rest_client))
+ d.callback(['foo'])
+ return d
+ d.addCallback(handleDistributedPacket)
+ return d
+ # ------------------------------------------------------------------------
+ def test84_distributePacketNoRestHost(self):
+ self.createClients(1)
+ d = self.client_factory[0].established_deferred
+ d.addCallback(self.setupCallbackChain)
+ d.addCallback(self.login, 0)
+ d.addCallback(self.joinTable, 0, 2, 'Table2', '2-4-limit')
+ def handleDistributedPacket(x):
+ avatar = self.service.avatars[0]
+ self.service.packet2resthost = lambda packet: (None, None)
+ r = avatar.handleDistributedPacket(None, PacketPing(), '{ "type": "PacketPing" }')
+ self.assertEquals([], r)
+ d.addCallback(handleDistributedPacket)
+ return d
+ # ------------------------------------------------------------------------
+ def test85_longPoll(self):
+ self.createClients(1)
+ d = self.client_factory[0].established_deferred
+ d.addCallback(self.setupCallbackChain)
+ d.addCallback(self.login, 0)
+ d.addCallback(self.joinTable, 0, 2, 'Table2', '2-4-limit')
+ def handleLongPoll(x):
+ avatar = self.service.avatars[0]
+ avatar.handlePacketLogic(PacketPokerLongPoll())
+ self.assertNotEquals(None, avatar._longpoll_deferred)
+ self.assertEquals(False, avatar._block_longpoll_deferred)
+ avatar._longpoll_deferred.addCallback(lambda packets: self.assertEquals(PACKET_PING, packets[0].type))
+ avatar.sendPacket(PacketPing())
+ return avatar._longpoll_deferred
+ d.addCallback(handleLongPoll)
+ return d
+ # ------------------------------------------------------------------------
+ def test86_longPollReturn(self):
+ self.createClients(1)
+ d = self.client_factory[0].established_deferred
+ d.addCallback(self.setupCallbackChain)
+ d.addCallback(self.login, 0)
+ d.addCallback(self.joinTable, 0, 2, 'Table2', '2-4-limit')
+ def handleLongPollReturn(x):
+ avatar = self.service.avatars[0]
+ avatar.handlePacketLogic(PacketPokerLongPoll())
+ self.assertNotEquals(None, avatar._longpoll_deferred)
+ self.assertEquals(False, avatar._block_longpoll_deferred)
+ avatar._packets_queue = ['foo']
+ avatar._longpoll_deferred.addCallback(self.assertEquals, ['foo'])
+ avatar.handlePacketLogic(PacketPokerLongPollReturn())
+ return avatar._longpoll_deferred
+ d.addCallback(handleLongPollReturn)
+ return d
+
##############################################################################
class PokerAvatarNoClientServerTestCase(unittest.TestCase):
class MockPlayerInfo:
@@ -4574,7 +4690,7 @@
##############################################################################
def Run():
loader = runner.TestLoader()
-# loader.methodPrefix = "test0"
+# loader.methodPrefix = "test8"
suite = loader.suiteFactory()
suite.addTest(loader.loadClass(PokerAvatarTestCase))
suite.addTest(loader.loadClass(PokerAvatarNoClientServerTestCase))
Index: tests/test-pokerservice.py.in
===================================================================
--- tests/test-pokerservice.py.in (revision 6210)
+++ tests/test-pokerservice.py.in (working copy)
@@ -1062,69 +1062,80 @@
def test01_packet2resthost(self):
self.setUpService(self.xml_with_resthost)
self.service.startService()
+
+ db = self.service.db
+ db.db.query("INSERT INTO resthost VALUES (2, 'two', 'host2', 2222, 'path2')")
+ db.db.query("INSERT INTO route VALUES (102, 0, 0, 2)")
+
#
# ping is never routed
#
- resthost = self.service.packet2resthost(PacketPing())
+ resthost, game_id = self.service.packet2resthost(PacketPing())
self.assertEqual(None, resthost)
+ self.assertEqual(None, game_id)
#
- # poll (or any packet with a valid game_id) is routed
+ # packet with a valid game_id is routed if resthost is != from server
#
- resthost = self.service.packet2resthost(PacketPokerPoll(game_id = 1))
- self.assertEqual('HOST', resthost[0])
+ resthost, game_id = self.service.packet2resthost(PacketPokerCheck(game_id = 102))
+ self.assertEqual('host2', resthost[0])
+ self.assertEqual(102, game_id)
#
- # poll to an unknown game_id is not routed
+ # packet is not routed if resthost point to the same server
#
- resthost = self.service.packet2resthost(PacketPokerPoll(game_id = 888))
+ resthost, game_id = self.service.packet2resthost(PacketPokerCheck(game_id = 1))
self.assertEqual(None, resthost)
+ self.assertEqual(1, game_id)
#
- # poll to a tourney is routed
+ # packet with an unknown game_id is not routed
#
- db = self.service.db
- db.db.query("INSERT INTO route VALUES (0, 484, 1, 1)")
- resthost = self.service.packet2resthost(PacketPokerPoll(tourney_serial = 484))
- self.assertEqual('HOST', resthost[0])
+ resthost, game_id = self.service.packet2resthost(PacketPokerCheck(game_id = 888))
+ self.assertEqual(None, resthost)
+ self.assertEqual(888, game_id)
#
- # poll to a tourney table is routed
+ # packet to a tourney is routed if resthost is != from server
#
db = self.service.db
- db.db.query("INSERT INTO route VALUES (232, 484, 1, 1)")
- resthost = self.service.packet2resthost(PacketPokerPoll(game_id = 232))
- self.assertEqual('HOST', resthost[0])
+ db.db.query("INSERT INTO route VALUES (0, 484, 1, 2)")
+ resthost, game_id = self.service.packet2resthost(PacketPokerGetTourneyManager(tourney_serial = 484))
+ self.assertEqual('host2', resthost[0])
#
- # poll to an unknown tourney_serial is not routed
+ # packet to a tourney is routed if resthost is != from server
#
- resthost = self.service.packet2resthost(PacketPokerPoll(tourney_serial = 999))
- self.assertEqual(None, resthost)
+ resthost, game_id = self.service.packet2resthost(PacketPokerTourneyRegister(game_id = 484))
+ self.assertEqual('host2', resthost[0])
#
- # PACKET_POKER_TOURNEY_REGISTER
+ # packet to a tourney table is routed if resthost is != from server
#
- resthost = self.service.packet2resthost(PacketPokerTourneyRegister(game_id = 484))
- self.assertEqual('HOST', resthost[0])
+ db.db.query("INSERT INTO route VALUES (232, 484, 1, 2)")
+ resthost, game_id = self.service.packet2resthost(PacketPokerCheck(game_id = 232))
+ self.assertEqual('host2', resthost[0])
+ self.assertEqual(232, game_id)
#
- # packet with a game_id
+ # packet to an unknown tourney_serial is not routed
#
- resthost = self.service.packet2resthost(PacketPokerCall(game_id = 1))
- self.assertEqual('HOST', resthost[0])
+ resthost, game_id = self.service.packet2resthost(PacketPokerGetTourneyManager(tourney_serial = 999))
+ self.assertEqual(None, resthost)
#
- # PACKET_POKER_GET_TOURNEY_MANAGER
+ # packet to an unknown tourney_serial is not routed
#
- db.db.query("INSERT INTO route VALUES (0, 1, 0, 1)")
- resthost = self.service.packet2resthost(PacketPokerGetTourneyManager(tourney_serial = 1))
- self.assertEqual('HOST', resthost[0])
+ resthost, game_id = self.service.packet2resthost(PacketPokerTourneyRegister(game_id = 999))
+ self.assertEqual(None, resthost)
def test02_packet2resthost_createTourney(self):
self.setUpService(self.xml)
self.service.startService()
- self.failIf(self.service.packet2resthost(PacketPokerCreateTourney()))
+ resthost, game_id = self.service.packet2resthost(PacketPokerCreateTourney())
+ self.failIf(resthost)
+ self.failIf(game_id)
db = self.service.db
db.db.query("INSERT INTO resthost VALUES (10, 'one', 'host1', 1, 'path1')")
db.db.query("INSERT INTO route VALUES (0, 100, 0, 10)")
db.db.query("INSERT INTO route VALUES (0, 200, 0, 10)")
db.db.query("INSERT INTO resthost VALUES (20, 'two', 'host2', 2, 'path2')")
db.db.query("INSERT INTO route VALUES (0, 300, 0, 20)")
- resthost = self.service.packet2resthost(PacketPokerCreateTourney())
+ resthost, game_id = self.service.packet2resthost(PacketPokerCreateTourney())
self.assertEqual('host2', resthost[0])
+ self.assertEqual(None, game_id)
class PokerServiceTestCase(PokerServiceTestCaseBase):
Index: tests/test-pokersite.py.in
===================================================================
--- tests/test-pokersite.py.in (revision 6213)
+++ tests/test-pokersite.py.in (working copy)
@@ -83,6 +83,9 @@
def getClientQueuedPacketMax(self):
return 2000
+ def packet2resthost(self, packet):
+ return (None, None)
+
class HelpersTestCase(unittest.TestCase):
def test_fromutf8(self):
signature.asc
Description: This is a digitally signed message part
_______________________________________________ Pokersource-users mailing list [email protected] https://mail.gna.org/listinfo/pokersource-users
