On Jan 14, 2011, at 3:26 PM, A.M. wrote: > I suspect I have to implement special converter methods with psycopg2 > register_adapter/new_type. That is what I am experimenting with now.
It looks like SQLAlchemy's array type support doesn't support anything beyond basic built-in types that accept numbers or text values. (Please do correct me if I am wrong.) I suspect this is a limitation of the default psycopg2 array support which does not allow passing a type along with the array during execution. It also seems to always bind arrays inline. I worked around this by installing type converters directly in psycopg2. SQLAlchemy then ends up with the values as pre-cooked ACLItem and ACLItemArray objects. From an external API standpoint, nothing changed. One stumbling block I had was that the code sample in the docs was too slim and there is no UserDefinedType example in the examples directory, so I spent a lot of time in the debugger. I hope this code may help others. Here is the code that worked: ====================== import sqlalchemy.types as types import re import sqlalchemy.exc from sqlalchemy.dialects.postgresql import ARRAY from project.lib.aclitem import ACLItem,ACLItemArray from copy import deepcopy #include/utils/acl.h #define ACL_ALL_RIGHTS_STR "arwdDxtXUCTc" #update for SQLAlchemy 0.7: http://www.sqlalchemy.org/docs/07/orm/extensions/mutable.html class ACLItemArrayType(types.MutableType,types.UserDefinedType,types.Concatenable): def get_col_spec(self): return 'aclitem[]' def bind_processor(self,dialect): return None def result_process(self,dialect): return None def copy_value(self,value): return deepcopy(value) def compare_values(self,a,b): return a == b class ACLItemType(types.MutableType,types.UserDefinedType): #class ACLItemType(types.UserDefinedType,types.MutableType): #FAIL def get_col_spec(self): return 'aclitem' def bind_processor(self,dialect): return None def copy_value(self,value): return deepcopy(value) def result_processor(self,dialect,column_type): return None =========== import re from copy import deepcopy class ACLItem(object): def __init__(self,grantee,permissions,grantor,grant_option=False): self.grantee = grantee self.permissions = [] if permissions: for p in permissions: self.permissions.append(p) self.grantor = grantor self.grant_option = grant_option def _as_pgsql_string(self): #convert to string 'user <grantee>=<perms>/<grantor>' string_perms = '' for perm in self.permissions: string_perms += perm if self.grant_option: grant_option = '*' else: grant_option = '' return "user %s=%s%s/%s" % (self.grantee,string_perms,grant_option,self.grantor) def __deepcopy__(self,memo): return ACLItem(self.grantee,self.permissions,self.grantor,self.grant_option) def __copy__(self): return deepcopy(self) @classmethod def _from_pgsql_string(klass,aclstring): "grantee=perms*/grantor" matches = re.match('([^=]+)=([^/\*]+)(\*?)/(\w+)',aclstring) if matches is None: raise ValueError('string does not appear to represent a PostgreSQL ACL') grantee = matches.group(1) permissions = matches.group(2) grant_option = bool(len(matches.group(3))) grantor = matches.group(4) return ACLItem(grantee,permissions,grantor,grant_option) def __eq__(self,other_object): if not isinstance(other_object,self.__class__): return False return str(self) == str(other_object) def has_permission(self,permission_test): return permission_test in self.permissions def set_permission(self,permission,on=True): if not self.has_permission(permission) and on: self.permissions.append(permission) elif self.has_permission(permission) and not on: self.permissions.remove(permission) def clear_permissions(self): del self.permissions[:] def reset_with_acl(self,acl): """takes an acl and replaces its own settings with the argument settings This is useful for cases where an acl in an array is "replaced" without being creating a new aclitem so that the array order in unmodified """ self.grantee = acl.grantee self.permissions = acl.permissions self.grantor = acl.grantor self.grant_option = acl.grant_option def __str__(self): return self._as_pgsql_string() def __repr__(self): return "ACLItem('%s',%s,'%s',%s)" % (self.grantee,self.permissions,self.grantor,self.grant_option) class ACLItemArray(list): #in an aclitem array, the def aclitem_for_grantee(self,role): for acl in self: if role == acl.grantee: return acl return None def has_permission(self,role,permission): existingacl = self.aclitem_for_grantee(role) if existingacl: return existingacl.has_permission(permission) else: return False def set_permission(self,role,permission): #cannot automatically create new ACLItem because we don't have grant_option, grantor- use append instead return self.aclitem_for_grantee(role).set_permission(permission) def set_grant_option(self,role,option=True): self.aclitem_for_grantee(role).grant_option = option def clear_permissions(self): del self[:] def append(self,acl): #if there is already an acl for this user, replace with new acl (so that there are not duplicate acls for the grantee) existing_acl = self.aclitem_for_grantee(acl.grantee) if existing_acl: existing_acl.reset_with_acl(acl) else: super(ACLItemArray,self).append(acl) def remove_grantee(self,grantee): existing_acl = self.aclitem_for_grantee(grantee) if existing_acl: self.remove(existing_acl) else: raise ValueError('ACL for %s not in list' % grantee) @classmethod def _from_pgsql_string(klass,value): #{agentm=rU/agentm,agentm=X/agentm} #strip off the surrounding braces value = value.lstrip('{') value = value.rstrip('}') acl_strings = value.split(',') acl_array = ACLItemArray() for acl_string in acl_strings: acl = ACLItem._from_pgsql_string(acl_string) acl_array.append(acl) return acl_array def __deepcopy__(self,memo): arr = ACLItemArray() for acl in self: arr.append(deepcopy(acl,memo)) return arr def register_psycopg2_types(): from psycopg2.extensions import adapt, register_adapter, AsIs, QuotedString, new_type, register_type def adapt_ACLItem(aclitem): acl_quoted_string = adapt(aclitem._as_pgsql_string()).getquoted() return AsIs(acl_quoted_string + '::aclitem') def adapt_ACLItemArray(aclitemarray): adapted_acls = [] for acl in aclitemarray: adapted_acls.append(adapt(acl).getquoted()) return AsIs('ARRAY[' + ','.join(adapted_acls) + ']::aclitem[]') register_adapter(ACLItem,adapt_ACLItem) register_adapter(ACLItemArray,adapt_ACLItemArray) def create_ACLItem(value,cursor): return ACLItem._from_pgsql_string(value) def create_ACLItemArray(value,cursor): return ACLItemArray._from_pgsql_string(value) ACLItemType = new_type((1033,),'aclitem',create_ACLItem) register_type(ACLItemType) ACLItemArrayType = new_type((1034,),'aclitem[]',create_ACLItemArray) register_type(ACLItemArrayType) -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalchemy@googlegroups.com. To unsubscribe from this group, send email to sqlalchemy+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en.