-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Hi,
With the introduction of the patch you implemented last friday ( long poll 3/5 : all requests for a session are handled by the same server ), the PokerAvatar instance that receives packets can delegate the handling of packets to the server responsible for it. For instance if the table T is on server S1 and the PokerAvatar receiving the request (responsible for the session) is on server S2, the request will have to be sent from S2 to S1. When S1 answers, S2 will take the packets and add them to the PokerAvatar queue, therefore acting as a reverse proxy. The idea is to enable PokerExplain on the S2 server (responsible for the session) and not on S1. Activating PokerExplain on both would lead to chaos anyway :-) Here is a draft patch of the implementation that will better explain the intended implementation than a long explanation. While implementing it, I realized the PokerRestClient API is wrong. Instead of setting a callback ( receive function ) it must fire a deferred that is returned by the sendPacket function. Callbacks are not in the twisted tradition anyway. The use of the deferred is illustrated below. Could you please be on #pokersource to discuss the specifics of the implementation monday morning around 11am CEST ? Cheers -----BEGIN PGP SIGNATURE----- Version: GnuPG v2.0.11 (GNU/Linux) Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org iEYEARECAAYFAkqj+nEACgkQ8dLMyEl6F22uawCgvuyk15pBkwtrI83uj7XBTC8K Ai4AoMShAjDlwGGLHYv7F3oLG5SoByHD =4s1V -----END PGP SIGNATURE-----
Index: pokerservice.py
===================================================================
--- pokerservice.py (revision 6204)
+++ pokerservice.py (working copy)
@@ -1725,6 +1725,11 @@
cursor.close()
def packet2resthost(self, packet):
+ #
+ # game_id is only set for packets related to a table and not for
+ # packets that are delegated but related to tournaments.
+ #
+ game_id = None
cursor = self.db.cursor()
if packet.type == PACKET_POKER_CREATE_TOURNEY:
#
@@ -1735,31 +1740,30 @@
self.message("packet2resthost: create tourney " + sql)
cursor.execute(sql)
else:
- if packet.type == PACKET_POKER_POLL:
- wheres = []
- if packet.game_id > 0:
- wheres.append("table_serial = " + str(packet.game_id))
- if packet.tourney_serial > 0:
- wheres.append(" tourney_serial = " + str(packet.tourney_serial))
- where = " AND ".join(wheres)
- elif packet.type in ( PACKET_POKER_TOURNEY_REQUEST_PLAYERS_LIST, PACKET_POKER_TOURNEY_REGISTER, PACKET_POKER_TOURNEY_UNREGISTER ):
+ if packet.type in ( PACKET_POKER_TOURNEY_REQUEST_PLAYERS_LIST, PACKET_POKER_TOURNEY_REGISTER, PACKET_POKER_TOURNEY_UNREGISTER ):
where = "tourney_serial = " + str(packet.game_id)
elif packet.type in ( PACKET_POKER_GET_TOURNEY_MANAGER, ):
where = "tourney_serial = " + str(packet.tourney_serial)
elif hasattr(packet, "game_id"):
where = "table_serial = " + str(packet.game_id)
+ game_id = packet.game_id
else:
cursor.close()
return None
- cursor.execute("SELECT host, port, path FROM route,resthost WHERE " +
- " route.resthost_serial = resthost.serial AND " + where)
+ cursor.execute("SELECT host, port, path FROM route,resthost " +
+ " WHERE route.resthost_serial = resthost.serial " +
+ #
+ # exclude the routes that go to this server
+ #
+ " AND resthost.serial != " + self.resthost_serial +
+ " AND " + where)
if cursor.rowcount > 0:
result = cursor.fetchone()
else:
result = None
cursor.close()
- return result
+ return ( result, game_id )
def cleanUp(self, temporary_users = ''):
cursor = self.db.cursor()
Index: pokersite.py
===================================================================
--- pokersite.py (revision 6203)
+++ pokersite.py (working copy)
@@ -225,18 +225,18 @@
#
return True
- deferred.addCallback(lambda result: self.deferRender(request, jsonp, packet))
+ deferred.addCallback(lambda result: self.deferRender(request, jsonp, packet, data))
deferred.addErrback(pipesFailed)
return server.NOT_DONE_YET
- def deferRender(self, request, jsonp, packet):
+ def deferRender(self, request, jsonp, packet, data):
if request.finished:
#
# For instance : the request was reverse-proxied to a server.
#
return True
session = request.getSession()
- d = defer.maybeDeferred(session.avatar.handlePacketDefer, packet)
+ d = defer.maybeDeferred(session.avatar.handleDistributedPacket, request, packet, data)
def render(packets):
#
# update the session information if the avatar changed
Index: pokeravatar.py
===================================================================
--- pokeravatar.py (revision 6203)
+++ pokeravatar.py (working copy)
@@ -62,6 +62,9 @@
self.has_session = False
self.bugous_processing_hand = False
self.noqueuePackets()
+ self._block_longpoll_deferred = False
+ self._longpoll_deferred = False
+ self.game_id2rest_client = {}
def __str__(self):
return "PokerAvatar serial = %s, name = %s" % ( self.getSerial(), self.getName() )
@@ -247,6 +250,7 @@
# This method was introduced when we added the force-disconnect as
# the stop-gap.
self._packets_queue.extend(newPackets)
+ self.flushLongPollDefered()
warnVal = int(.75 * self.service.getClientQueuedPacketMax())
if len(self._packets_queue) >= warnVal:
# If we have not warned yet that packet queue is getting long, warn now.
@@ -339,6 +343,67 @@
else:
return False
+ def longpollDeferred(self):
+ self._longpoll_deferred = defer.Deferred()
+ self.flushLongPollDeferred()
+
+ def blockLongPollDeferred(self):
+ self._block_longpoll_deferred = True
+
+ def unblockLongPollDeferred(self):
+ self._block_longpoll_deferred = False
+ self.flushLongPollDeferred()
+
+ def flushLongPollDeferred(self):
+ if self._block_longpoll_deferred == False and self._longpoll_deferred and len(self._packets_queue) > 0:
+ packets = self.resetPacketsQueue()
+ d = self._longpoll_deferred
+ self._longpoll_deferred = None
+ d.callback(packets)
+
+ def handleDistributedPacket(self, request, packet, data):
+ resthost = self.service.packet2resthost(packet)
+ if resthost:
+ return self.distributePacket(packet, data, resthost)
+ else:
+ return self.handlePacketDefer(packet)
+
+ def getOrCreateRestClient(self, resthost):
+ #
+ # no game_id means the request must be delegated for tournament
+ # registration or creation. Not for table interaction.
+ #
+ ( ( host, port, path ), game_id ) = resthost
+ if game_id:
+ if not self.game_id2rest_client.has_key(game_id):
+ self.game_id2rest_client[game_id] = PokerRestClient(host, port, path, self.service.verbose)
+ client = self.game_id2rest_client[game_id]
+ else:
+ client = PokerRestClient(host, port, path, self.service.verbose)
+ return client
+
+ def distributePacket(self, packet, data, resthost):
+ ( ( host, port, path ), game_id ) = resthost
+ client = self.getOrCreateRestClient(resthost)
+ d = client.sendPacket(packet, data)
+ d.addCallback(lambda packets: self.incomingDistributedPackets(packets, game_id))
+ return d
+
+ def incomingDistributedPackets(self, packets, game_id):
+ if game_id:
+ if game_id not in self.tables:
+ #
+ # discard client if nothing pending and not in the list
+ # of active tables
+ #
+ if ( len(client.queue.callbacks) <= 0 or
+ client.pendingLongPoll ):
+ del self.game_id2rest_client[game_id]
+ self.blockLongPollDeferred()
+ for packet in packets:
+ self.sendPacket(packet)
+ self.unblockLongPollDeferred()
+
def handlePacketDefer(self, packet):
self.queuePackets()
self.handlePacketLogic(packet)
@@ -367,7 +432,14 @@
def handlePacketLogic(self, packet):
if self.service.verbose > 2 and packet.type != PACKET_PING:
self.message("handlePacketLogic(%d): " % self.getSerial() + str(packet))
-
+ if packet.type == PACKET_POKER_LONG_POLL:
+ self.longpollDeferred()
+ return
+
+ if packet.type == PACKET_POKER_LONG_POLL_RETURN:
+ self.flushLongPollDeferred()
+ return
+
if packet.type == PACKET_POKER_EXPLAIN:
if self.setExplain(packet.value):
self.sendPacketVerbose(PacketAck())
1.patch.sig
Description: PGP signature
<<attachment: loic.vcf>>
_______________________________________________ Pokersource-users mailing list [email protected] https://mail.gna.org/listinfo/pokersource-users
