On Jan 14, 2011, at 3:26 PM, A.M. wrote:

> I suspect I have to implement special converter methods with psycopg2 
> register_adapter/new_type. That is what I am experimenting with now.


It looks like SQLAlchemy's array type support doesn't support anything beyond 
basic built-in types that accept numbers or text values. (Please do correct me 
if I am wrong.) I suspect this is a limitation of the default psycopg2 array 
support which does not allow passing a type along with the array during 
execution. It also seems to always bind arrays inline.

I worked around this by installing type converters directly in psycopg2. 
SQLAlchemy then ends up with the values as pre-cooked ACLItem and ACLItemArray 
objects. From an external API standpoint, nothing changed.

One stumbling block I had was that the code sample in the docs was too slim and 
there is no UserDefinedType example in the examples directory, so I spent a lot 
of time in the debugger. I hope this code may help others.

Here is the code that worked:
======================
import sqlalchemy.types as types
import re
import sqlalchemy.exc
from sqlalchemy.dialects.postgresql import ARRAY
from project.lib.aclitem import ACLItem,ACLItemArray
from copy import deepcopy

#include/utils/acl.h
#define ACL_ALL_RIGHTS_STR      "arwdDxtXUCTc" 
#update for SQLAlchemy 0.7: 
http://www.sqlalchemy.org/docs/07/orm/extensions/mutable.html

class 
ACLItemArrayType(types.MutableType,types.UserDefinedType,types.Concatenable):
    def get_col_spec(self):
        return 'aclitem[]'

    def bind_processor(self,dialect):
        return None

    def result_process(self,dialect):
        return None

    def copy_value(self,value):
        return deepcopy(value)

    def compare_values(self,a,b):
        return a == b

class ACLItemType(types.MutableType,types.UserDefinedType):
#class ACLItemType(types.UserDefinedType,types.MutableType): #FAIL

    def get_col_spec(self):
        return 'aclitem'

    def bind_processor(self,dialect):
        return None

    def copy_value(self,value):
        return deepcopy(value)

    def result_processor(self,dialect,column_type):
        return None

        
===========
import re
from copy import deepcopy

class ACLItem(object):
    def __init__(self,grantee,permissions,grantor,grant_option=False):
        self.grantee = grantee
        self.permissions = []
        if permissions:
            for p in permissions:
                self.permissions.append(p)
        self.grantor = grantor
        self.grant_option = grant_option    

    def _as_pgsql_string(self):
        #convert to string 'user <grantee>=<perms>/<grantor>'
        string_perms = ''
        for perm in self.permissions: 
            string_perms += perm
            
        if self.grant_option:
            grant_option = '*'
        else:
            grant_option = ''
        return "user %s=%s%s/%s" % 
(self.grantee,string_perms,grant_option,self.grantor)        

    def __deepcopy__(self,memo):
        return 
ACLItem(self.grantee,self.permissions,self.grantor,self.grant_option)

    def __copy__(self):
        return deepcopy(self)

    @classmethod
    def _from_pgsql_string(klass,aclstring):
        "grantee=perms*/grantor"
        matches = re.match('([^=]+)=([^/\*]+)(\*?)/(\w+)',aclstring)
        if matches is None:
            raise ValueError('string does not appear to represent a PostgreSQL 
ACL')
        grantee = matches.group(1)
        permissions = matches.group(2)
        grant_option = bool(len(matches.group(3)))
        grantor = matches.group(4)
        return ACLItem(grantee,permissions,grantor,grant_option)        

    def __eq__(self,other_object):
        if not isinstance(other_object,self.__class__):
            return False
        return str(self) == str(other_object)

    def has_permission(self,permission_test):
        return permission_test in self.permissions

    def set_permission(self,permission,on=True):
        if not self.has_permission(permission) and on:
                self.permissions.append(permission)
        elif self.has_permission(permission) and not on:
            self.permissions.remove(permission)

    def clear_permissions(self):    
        del self.permissions[:]

    def reset_with_acl(self,acl):
        """takes an acl and replaces its own settings with the argument settings
        This is useful for cases where an acl in an array is "replaced" without 
being creating a new aclitem so that the array order in unmodified
        """
        self.grantee = acl.grantee
        self.permissions = acl.permissions
        self.grantor = acl.grantor
        self.grant_option = acl.grant_option

    def __str__(self):
        return self._as_pgsql_string()

    def __repr__(self):
        return "ACLItem('%s',%s,'%s',%s)" % 
(self.grantee,self.permissions,self.grantor,self.grant_option)

class ACLItemArray(list):
    #in an aclitem array, the 
    def aclitem_for_grantee(self,role):
        for acl in self:
            if role == acl.grantee:
                return acl
        return None

    def has_permission(self,role,permission):
        existingacl = self.aclitem_for_grantee(role)
        if existingacl:
            return existingacl.has_permission(permission)
        else:
            return False

    def set_permission(self,role,permission): #cannot automatically create new 
ACLItem because we don't have grant_option, grantor- use append instead
        return self.aclitem_for_grantee(role).set_permission(permission)

    def set_grant_option(self,role,option=True):
        self.aclitem_for_grantee(role).grant_option = option

    def clear_permissions(self):
        del self[:]

    def append(self,acl):
        #if there is already an acl for this user, replace with new acl (so 
that there are not duplicate acls for the grantee)
        existing_acl = self.aclitem_for_grantee(acl.grantee)
        if existing_acl:
            existing_acl.reset_with_acl(acl)
        else:
            super(ACLItemArray,self).append(acl)

    def remove_grantee(self,grantee):
        existing_acl = self.aclitem_for_grantee(grantee)
        if existing_acl:
            self.remove(existing_acl)
        else:
            raise ValueError('ACL for %s not in list' % grantee)
        
    @classmethod
    def _from_pgsql_string(klass,value):
        #{agentm=rU/agentm,agentm=X/agentm}
        #strip off the surrounding braces
        value = value.lstrip('{')
        value = value.rstrip('}')
        
        acl_strings = value.split(',')
        acl_array = ACLItemArray()
        for acl_string in acl_strings:
            acl = ACLItem._from_pgsql_string(acl_string)
            acl_array.append(acl)
        return acl_array

    def __deepcopy__(self,memo):
        arr = ACLItemArray()
        for acl in self:
            arr.append(deepcopy(acl,memo))
        return arr

def register_psycopg2_types():
    from psycopg2.extensions import adapt, register_adapter, AsIs, 
QuotedString, new_type, register_type
 
    def adapt_ACLItem(aclitem):
        acl_quoted_string = adapt(aclitem._as_pgsql_string()).getquoted()
        return AsIs(acl_quoted_string + '::aclitem')

    def adapt_ACLItemArray(aclitemarray):
        adapted_acls = []
        for acl in aclitemarray:
            adapted_acls.append(adapt(acl).getquoted())
        return AsIs('ARRAY[' + ','.join(adapted_acls) + ']::aclitem[]')

    register_adapter(ACLItem,adapt_ACLItem)
    register_adapter(ACLItemArray,adapt_ACLItemArray)

    def create_ACLItem(value,cursor):
        return ACLItem._from_pgsql_string(value)

    def create_ACLItemArray(value,cursor):
        return ACLItemArray._from_pgsql_string(value)

    ACLItemType = new_type((1033,),'aclitem',create_ACLItem)
    register_type(ACLItemType)

    ACLItemArrayType = new_type((1034,),'aclitem[]',create_ACLItemArray)
    register_type(ACLItemArrayType)


-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalchemy@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to