Hi, I am using thrift in a python client but I found thrift is lack of a
python TSocketPool implementation, I ported one from the php version and I
am sending it in the attachment to see if others will find it useful.
This is a very rudimentary porting. I can make it a complete porting if you
think it is useful and can be put into thrift code base. Please also let me
know what else should be done for it to be helpful for others.
Thanks,
Robert
from thrift.transport.TTransport import *
from thrift.transport.TSocket import *
from random import *
import os
import errno
import socket
import sys
import time
class TSocketPool(TSocket):
serverStates = {}
def __init__(self, host, port=9090):
TSocket.__init__(self)
self.servers = []
self.randomize = True
self.retryInterval = 60
self.numRetries = 1
self.maxConsecutiveFailures = 1
self.alwaysTryLast = True
if type(host) is list:
self.servers = host
else:
self.servers = [(host, port,)]
def open(self):
# Check if we want order randomization
servers = self.servers
if self.randomize:
servers = []
oldServers = []
oldServers.extend(self.servers)
while len(oldServers):
posn = int(random()*len(oldServers))
servers.append(oldServers[posn])
oldServers[posn] = oldServers[-1]
oldServers.pop()
# Count servers to identify the "last" one
for i in range(0, len(servers)):
# This extracts the $host and $port variables
host, port = servers[i]
# Check APC cache for a record of this server being down
failtimeKey = 'thrift_failtime:%s%d~' % (host, port)
# Cache miss? Assume it's OK
lastFailtime = TSocketPool.serverStates.get(failtimeKey, 0)
retryIntervalPassed = False
# Cache hit...make sure enough the retry interval has elapsed
if lastFailtime > 0:
elapsed = int(time.time()) - lastFailtime
if elapsed > self.retryInterval:
retryIntervalPassed = True
# Only connect if not in the middle of a fail interval, OR if this
# is the LAST server we are trying, just hammer away on it
isLastServer = self.alwaysTryLast and i == (len(servers) - 1) or False
if lastFailtime == 0 or isLastServer or (lastFailtime > 0 and retryIntervalPassed):
# Set underlying TSocket params to this one
self.host = host
self.port = port
# Try up to numRetries_ connections per server
for attempt in range(0, self.numRetries):
try:
# Use the underlying TSocket open function
TSocket.open(self)
# Only clear the failure counts if required to do so
if lastFailtime > 0:
TSocketPool.serverStates[failtimeKey] = 0
# Successful connection, return now
return
except TTransportException, e:
# Connection failed
pass
# Mark failure of this host in the cache
consecfailsKey = 'thrift_consecfails:%s%d~' % (host, port)
# Ignore cache misses
consecfails = TSocketPool.serverStates.get(consecfailsKey, 0)
# Increment by one
consecfails += 1
# Log and cache this failure
if consecfails >= self.maxConsecutiveFailures:
# Store the failure time
TSocketPool.serverStates[failtimeKey] = int(time.time())
# Clear the count of consecutive failures
TSocketPool.serverStates[consecfailsKey] = 0
else:
TSocketPool.serverStates[consecfailsKey] = consecfails
# Oh no; we failed them all. The system is totally ill!
error = 'TSocketPool: All hosts in pool are down. ';
hostlist = ','.join(['%s:%d' % (s[0], s[1]) for s in self.servers])
error += '(%s)' % hostlist
raise Exception(error)