-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi,

With the introduction of the patch you implemented last friday ( long
poll 3/5 : all requests for a session are handled by the same server
), the PokerAvatar instance that receives packets can delegate the
handling of packets to the server responsible for it. For instance if
the table T is on server S1 and the PokerAvatar receiving the request
(responsible for the session) is on server S2, the request will have
to be sent from S2 to S1. When S1 answers, S2 will take the packets
and add them to the PokerAvatar queue, therefore acting as a reverse
proxy.

The idea is to enable PokerExplain on the S2 server (responsible for
the session) and not on S1. Activating PokerExplain on both would lead
to chaos anyway :-)

Here is a draft patch of the implementation that will better explain
the intended implementation than a long explanation. While
implementing it, I realized the PokerRestClient API is wrong. Instead
of setting a callback ( receive function ) it must fire a deferred
that is returned by the sendPacket function. Callbacks are not in the
twisted tradition anyway. The use of the deferred is illustrated below.

Could you please be on #pokersource to discuss the specifics of the
implementation monday morning around 11am CEST ?

Cheers





-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2.0.11 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkqj+nEACgkQ8dLMyEl6F22uawCgvuyk15pBkwtrI83uj7XBTC8K
Ai4AoMShAjDlwGGLHYv7F3oLG5SoByHD
=4s1V
-----END PGP SIGNATURE-----

Index: pokerservice.py
===================================================================
--- pokerservice.py	(revision 6204)
+++ pokerservice.py	(working copy)
@@ -1725,6 +1725,11 @@
             cursor.close()
 
     def packet2resthost(self, packet):
+        #
+        # game_id is only set for packets related to a table and not for
+        # packets that are delegated but related to tournaments.
+        #
+        game_id = None
         cursor = self.db.cursor()
         if packet.type == PACKET_POKER_CREATE_TOURNEY:
             #
@@ -1735,31 +1740,30 @@
                 self.message("packet2resthost: create tourney " + sql)
             cursor.execute(sql)
         else:
-            if packet.type == PACKET_POKER_POLL:
-                wheres = []
-                if packet.game_id > 0:
-                    wheres.append("table_serial = " + str(packet.game_id))
-                if packet.tourney_serial > 0:
-                    wheres.append(" tourney_serial = " + str(packet.tourney_serial))
-                where = " AND ".join(wheres)
-            elif packet.type in ( PACKET_POKER_TOURNEY_REQUEST_PLAYERS_LIST, PACKET_POKER_TOURNEY_REGISTER, PACKET_POKER_TOURNEY_UNREGISTER ):
+            if packet.type in ( PACKET_POKER_TOURNEY_REQUEST_PLAYERS_LIST, PACKET_POKER_TOURNEY_REGISTER, PACKET_POKER_TOURNEY_UNREGISTER ):
                 where = "tourney_serial = " + str(packet.game_id)
             elif packet.type in ( PACKET_POKER_GET_TOURNEY_MANAGER, ):
                 where = "tourney_serial = " + str(packet.tourney_serial)
             elif hasattr(packet, "game_id"):
                 where = "table_serial = " + str(packet.game_id)
+                game_id = packet.game_id
             else:
                 cursor.close()
                 return None
 
-            cursor.execute("SELECT host, port, path FROM route,resthost WHERE " +
-                           " route.resthost_serial = resthost.serial AND " + where)
+            cursor.execute("SELECT host, port, path FROM route,resthost " +
+                           " WHERE route.resthost_serial = resthost.serial " +
+                           #
+                           # exclude the routes that go to this server
+                           #
+                           " AND resthost.serial != " + self.resthost_serial +
+                           " AND " + where)
         if cursor.rowcount > 0:
             result = cursor.fetchone()
         else:
             result = None
         cursor.close()
-        return result
+        return ( result, game_id )
 
     def cleanUp(self, temporary_users = ''):
         cursor = self.db.cursor()
Index: pokersite.py
===================================================================
--- pokersite.py	(revision 6203)
+++ pokersite.py	(working copy)
@@ -225,18 +225,18 @@
             #
             return True
 
-        deferred.addCallback(lambda result: self.deferRender(request, jsonp, packet))
+        deferred.addCallback(lambda result: self.deferRender(request, jsonp, packet, data))
         deferred.addErrback(pipesFailed)
         return server.NOT_DONE_YET
 
-    def deferRender(self, request, jsonp, packet):
+    def deferRender(self, request, jsonp, packet, data):
         if request.finished:
             #
             # For instance : the request was reverse-proxied to a server.
             #
             return True
         session = request.getSession()
-        d = defer.maybeDeferred(session.avatar.handlePacketDefer, packet)
+        d = defer.maybeDeferred(session.avatar.handleDistributedPacket, request, packet, data)
         def render(packets):
             #
             # update the session information if the avatar changed
Index: pokeravatar.py
===================================================================
--- pokeravatar.py	(revision 6203)
+++ pokeravatar.py	(working copy)
@@ -62,6 +62,9 @@
         self.has_session = False
         self.bugous_processing_hand = False
         self.noqueuePackets()
+        self._block_longpoll_deferred = False
+        self._longpoll_deferred = False
+        self.game_id2rest_client = {}
 
     def __str__(self):
         return "PokerAvatar serial = %s, name = %s" % ( self.getSerial(), self.getName() )
@@ -247,6 +250,7 @@
         # This method was introduced when we added the force-disconnect as
         # the stop-gap.
         self._packets_queue.extend(newPackets)
+        self.flushLongPollDefered()
         warnVal = int(.75 * self.service.getClientQueuedPacketMax())
         if len(self._packets_queue) >= warnVal:
             # If we have not warned yet that packet queue is getting long, warn now.
@@ -339,6 +343,67 @@
         else:
             return False
 
+    def longpollDeferred(self):
+        self._longpoll_deferred = defer.Deferred()
+        self.flushLongPollDeferred()
+
+    def blockLongPollDeferred(self):
+        self._block_longpoll_deferred = True
+        
+    def unblockLongPollDeferred(self):
+        self._block_longpoll_deferred = False
+        self.flushLongPollDeferred()
+
+    def flushLongPollDeferred(self):
+        if self._block_longpoll_deferred == False and self._longpoll_deferred and len(self._packets_queue) > 0:
+            packets = self.resetPacketsQueue()
+            d = self._longpoll_deferred
+            self._longpoll_deferred = None
+            d.callback(packets)
+            
+    def handleDistributedPacket(self, request, packet, data):
+        resthost = self.service.packet2resthost(packet)
+        if resthost:
+            return self.distributePacket(packet, data, resthost)
+        else:
+            return self.handlePacketDefer(packet)
+
+    def getOrCreateRestClient(self, resthost):
+        #
+        # no game_id means the request must be delegated for tournament
+        # registration or creation. Not for table interaction.
+        #
+        ( ( host, port, path ), game_id ) = resthost
+        if game_id:
+            if not self.game_id2rest_client.has_key(game_id):
+                self.game_id2rest_client[game_id] = PokerRestClient(host, port, path, self.service.verbose)
+            client = self.game_id2rest_client[game_id]
+        else:
+            client = PokerRestClient(host, port, path, self.service.verbose)
+        return client
+            
+    def distributePacket(self, packet, data, resthost):
+        ( ( host, port, path ), game_id ) = resthost
+        client = self.getOrCreateRestClient(resthost)
+        d = client.sendPacket(packet, data)
+        d.addCallback(lambda packets: self.incomingDistributedPackets(packets, game_id))
+        return d
+            
+    def incomingDistributedPackets(self, packets, game_id):
+        if game_id:
+            if game_id not in self.tables:
+                #
+                # discard client if nothing pending and not in the list
+                # of active tables
+                #
+                if ( len(client.queue.callbacks) <= 0 or
+                     client.pendingLongPoll ):
+                    del self.game_id2rest_client[game_id]
+        self.blockLongPollDeferred()
+        for packet in packets:
+            self.sendPacket(packet)
+        self.unblockLongPollDeferred()
+
     def handlePacketDefer(self, packet):
         self.queuePackets()
         self.handlePacketLogic(packet)
@@ -367,7 +432,14 @@
     def handlePacketLogic(self, packet):
         if self.service.verbose > 2 and packet.type != PACKET_PING:
             self.message("handlePacketLogic(%d): " % self.getSerial() + str(packet))
-        
+        if packet.type == PACKET_POKER_LONG_POLL:
+            self.longpollDeferred()
+            return
+
+        if packet.type == PACKET_POKER_LONG_POLL_RETURN:
+            self.flushLongPollDeferred()
+            return
+
         if packet.type == PACKET_POKER_EXPLAIN:
             if self.setExplain(packet.value):
                 self.sendPacketVerbose(PacketAck())

Attachment: 1.patch.sig
Description: PGP signature

<<attachment: loic.vcf>>

_______________________________________________
Pokersource-users mailing list
[email protected]
https://mail.gna.org/listinfo/pokersource-users

Reply via email to