Hello community,
First of all - thanks for an awesome platform! I'm brand new to this
community, but have been using Twisted a couple years.
Reason for posting:
I've hit a condition with ReconnectingClientFactory that I'm not sure is per
design. I have a work around right now, but need your perspective. Seems
like there should be a better/right way to do this.
Attempted design:
I'd like to have long running TCP clients (forever until stopped), with a
long running TCP server. When a long running client hits a problem with a
dependency (database is down, kafka bus unavailable, external API not
responding, etc), I want the client to go offline for a while and then come
back online. an automated, self-recovery type action. Since it's not ok to
start/stop/restart the Twisted Reactor, I am letting the client finish
whatever it can do, disconnect from the service, destruct the dependencies,
wait for a period of time, and then attempt a clean re-initialization of
those dependencies along with reconnecting to the Twisted Server.
Problem case:
I'm using the ReconnectingClientFactory in my client. When the client hits
a problem, it calls transport.loseConnection(). But whenever the client
calls this, after the disconnect - it does not reconnect; stopFactory is
called and everything exits.
Work around:
I noticed some Twisted source code that works off factory.numPorts. If
numPorts is 1 and the client loses the connection, it goes to 0 and calls
the cleanup. So I conditionally increase this number right before
intentionally disconnecting, and then reset that after reconnecting. This
solves the problem, but it's a hack.
I'll attach the test scripts to this post (if attachments are allowed), but
the main code is with these functions in the factory:
def clientConnectionLost(self, connector, reason):
print(' factory clientConnectionLost:
reason: {}'.format(reason))
# if self.disconnectedOnPurpose:
# ## Hack to keep reactor alive
# print(' factory
clientConnectionLost: increasing numPorts')
# self.numPorts += 1
# self.numPortsChanged = True
# self.disconnectedOnPurpose =
False
print(' ... simulate client going idle
before attempting restart...')
time.sleep(5)
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
print(' factory clientConnectionLost:
end.\n')
def clientConnectionMade(self):
print(' factory clientConnectionMade:
starting numPorts: {}'.format(self.numPorts))
# if self.numPortsChanged :
# ## Resetting from hacked value
# print(' factory
clientConnectionMade: decreasing numPorts')
# self.numPorts -= 1
# self.numPortsChanged = False
print(' factory clientConnectionMade:
finished numPorts: {}'.format(self.numPorts))
def cleanup(self):
print('factory cleanup: calling
loseConnection')
if self.connectedClient is not None:
self.connectedClient.transport.loseConnection()
self.disconnectedOnPurpose =
True
With the above lines commented out, once the cleanup call does
transport.loseConnection(), the factory stops at the end of
clientConnectionLost.
Sample scripts/logs:
I've tried to create short test scripts and corresponding logs (with the
client failing, and then with it restarting when I use the workaround).
I've cut out several thousand lines to get down to something simple for the
example test scripts, but I know the client is still a little long. Again,
I'm not sure if attachments work on the mailing list, but I'll attempt to
attach the client/server scripts with the corresponding pass/fail logs.
Thanks!
-Chris
import os, sys, traceback
import json, time, datetime, psutil
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor, task, defer, threads
from contextlib import suppress
class CustomLineReceiverProtocol(LineReceiver):
delimiter = b':==:'
class ServiceClientProtocol(CustomLineReceiverProtocol):
def connectionMade(self):
print(' protocol connectionMade')
self.factory.connectedClient = self
self.factory.clientConnectionMade()
def lineReceived(self, line):
dataDict = json.loads(line)
if dataDict.get('action') == 'healthRequest':
self.factory.enterSimulateJob()
def connectionLost(self, reason):
print(' protocol connectionLost')
self.factory.connectedClient = None
def constructAndSendData(self, action, content):
message = {}
message['action'] = action
message['content'] = content
jsonMessage = json.dumps(message)
msg = jsonMessage.encode('utf-8')
print(' protocol constructAndSendData: {}'.format(msg))
self.sendLine(msg)
class ServiceClientFactory(ReconnectingClientFactory):
continueTrying = True
def __init__(self):
print('factory constructor')
self.connectedClient = None
self.health = {}
self.loopingSystemHealth =
task.LoopingCall(self.enterSystemHealthCheck)
self.loopingSystemHealth.start(10)
self.numPortsChanged = False
self.disconnectedOnPurpose = False
super().__init__()
def buildProtocol(self, addr):
print(' factory buildProtocol')
self.resetDelay()
protocol = ServiceClientProtocol()
protocol.factory = self
return protocol
def clientConnectionLost(self, connector, reason):
print(' factory clientConnectionLost: reason:
{}'.format(reason))
# if self.disconnectedOnPurpose:
# ## Hack to keep reactor alive
# print(' factory clientConnectionLost: increasing
numPorts')
# self.numPorts += 1
# self.numPortsChanged = True
# self.disconnectedOnPurpose = False
print(' ... simulate client going idle before attempting
restart...')
time.sleep(5)
ReconnectingClientFactory.clientConnectionLost(self, connector,
reason)
print(' factory clientConnectionLost: end.\n')
def clientConnectionMade(self):
print(' factory clientConnectionMade: starting numPorts:
{}'.format(self.numPorts))
# if self.numPortsChanged :
# ## Resetting from hacked value
# print(' factory clientConnectionMade: decreasing
numPorts')
# self.numPorts -= 1
# self.numPortsChanged = False
print(' factory clientConnectionMade: finished numPorts:
{}'.format(self.numPorts))
print(' ..... pausing for <CTRL><C> test')
time.sleep(3)
def cleanup(self):
print('factory cleanup: calling loseConnection')
if self.connectedClient is not None:
self.connectedClient.transport.loseConnection()
self.disconnectedOnPurpose = True
def stopFactory(self):
print('stopFactory')
self.stopTrying()
with suppress(Exception):
self.loopingSystemHealth.stop()
print('stopFactory end.')
def enterSimulateJob(self):
print(' factory enterSimulateJob')
threadHandle = threads.deferToThread(self.simulateJob)
return threadHandle
def simulateJob(self):
print(' factory simulateJob: starting job')
time.sleep(2)
self.connectedClient.constructAndSendData('jobResponse',
self.health)
print(' factory simulateJob: finished job... time to reset the
client (diconnect/re-initialize)...')
self.cleanup()
def enterSystemHealthCheck(self):
print(' factory enterSystemHealthCheck')
threadHandle = threads.deferToThread(self.getSystemHealth)
return threadHandle
def getSystemHealth(self):
print(' factory getSystemHealth')
try:
currentTime = time.time()
process = psutil.Process(os.getpid())
startTime = process.create_time()
self.health = {
'processCpuPercent': process.cpu_percent(),
'processMemory': process.memory_full_info().uss,
'processRunTime': int(currentTime-startTime)
}
print(' factory getSystemHealth: system health:
{}'.format(self.health))
except:
exception =
traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1],
sys.exc_info()[2])
print(' factory getSystemHealth: exception:
{}'.format(exception))
if __name__ == '__main__':
try:
connector = reactor.connectTCP('127.0.0.1', 51841,
ServiceClientFactory(), timeout=300)
reactor.run()
except:
stacktrace = traceback.format_exception(sys.exc_info()[0],
sys.exc_info()[1], sys.exc_info()[2])
print('clientWrapper exception: {}'.format(stacktrace))
print('exiting')
sys.exit(0)
import sys, traceback
import json
from twisted.internet import reactor, task, defer, threads
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
class CustomLineReceiverProtocol(LineReceiver):
delimiter = b':==:'
class ServiceListener(CustomLineReceiverProtocol):
def connectionMade(self):
print(' protocol connectionMade')
self.factory.activeClients.append(self)
def connectionLost(self, reason):
print(' protocol connectionLost')
self.factory.removeClient(self)
def lineReceived(self, line):
print(' protocol lineReceived: {}'.format(line))
def constructAndSendData(self, action):
message = {'action': action}
jsonMessage = json.dumps(message)
msg = jsonMessage.encode('utf-8')
print(' protocol constructAndSendData: {}'.format(msg))
self.sendLine(msg)
class ServiceFactory(ServerFactory):
protocol = ServiceListener
def __init__(self):
print('factory constructor')
super().__init__()
self.activeClients = []
self.loopingHealthUpdates =
task.LoopingCall(self.enterSystemHealthCheck)
self.loopingHealthUpdates.start(15)
def removeClient(self, client):
print(' factory removeClient')
self.activeClients.remove(client)
def enterSystemHealthCheck(self):
print(' factory enterSystemHealthCheck')
threadHandle = threads.deferToThread(self.sendHealthRequest)
return threadHandle
def sendHealthRequest(self):
if len(self.activeClients) <= 0:
print(' factory sendHealthRequest: no active clients
to talk to')
else:
for client in self.activeClients:
print(' factory sendHealthRequest: requesting
from client...')
client.constructAndSendData('healthRequest')
if __name__ == '__main__':
try:
reactor.listenTCP(51841, ServiceFactory(),
interface='127.0.0.1')
reactor.run()
except:
stacktrace = traceback.format_exception(sys.exc_info()[0],
sys.exc_info()[1], sys.exc_info()[2])
print('clientWrapper exception: {}'.format(stacktrace))
print('exiting')
sys.exit(0)
client_not_working.log
Description: Binary data
client_working_with_hack.log
Description: Binary data
server.log
Description: Binary data
_______________________________________________ Twisted-Python mailing list [email protected] https://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
