Hi everybody,

I'm trying to use Association Proxy in one of my classes relation to
itself. I've got a Task class and Tasks can be dependent to each other, and
I've also wanted to store the dependency type ('start-to-start',
'start-to-end' etc.), so AssociationProxy was the way I've gone. Any way my
problem is with events.

Here is an example illustrating the problem:

import logging
from sqlalchemy import event, Column, Integer, String, ForeignKey
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

Base = declarative_base()


class Task(Base):

    __tablename__ = 'Tasks'
    id = Column(Integer, primary_key=True)
    name = Column(String)

    status = Column(String)

    depends = association_proxy(
        'task_depends_to',  # outgoing_edges
        'to_node',  # to_node
        creator=lambda n: TaskDependent(to_node=n)
    )

    dependent_of = association_proxy(
        'task_dependent_of',  # incoming_edges
        'from_node',  # from_node
        creator=lambda n: TaskDependent(from_node=n)
    )

    task_depends_to = relationship(
        'TaskDependent',
        cascade="all, delete-orphan",
        primaryjoin='Tasks.c.id==Task_Dependencies.c.from_node_id'
    )

    task_dependent_of = relationship(
        'TaskDependent',
        cascade="all, delete-orphan",
        primaryjoin='Tasks.c.id==Task_Dependencies.c.to_node_id'
    )

    def __init__(self, name='', depends=None, status='WFD'):
        self.name = name
        self.status = status
        if depends is None:
            depends = []
        self.depends = depends

    def __repr__(self):
        return "<%s (Task)>" % self.name

    def update_status_with_dependent_statuses(self):
        if not self.depends:
            self.status = 'RTS'


class TaskDependent(Base):

    __tablename__ = "Task_Dependencies"
    from_node_id = Column(
        Integer,
        ForeignKey("Tasks.id"),
        primary_key=True
    )

    from_node = relationship(
        Task,
        back_populates='task_depends_to',
        primaryjoin='Task.id==TaskDependent.from_node_id',
    )

    to_node_id = Column(
        Integer,
        ForeignKey("Tasks.id"),
        primary_key=True
    )

    to_node = relationship(
        Task,
        back_populates='task_dependent_of',
        primaryjoin="TaskDependent.to_node_id==Task.id",
    )

    gap = Column(Integer)

    def __init__(self, from_node=None, to_node=None, dependency_type=None,
                 gap=0, gap_model='length'):
        self.to_node = to_node
        self.from_node = from_node
        self.dependency_type = dependency_type
        self.gap = gap


@event.listens_for(Task.task_depends_to, 'remove', propagate=True)
def removed_a_dependency(task, task_dependent, initiator):
    """Runs when a task is removed from another tasks dependency list.
    """
    # update task status with dependencies
    logger.debug('inside removed_a_dependency')
    logger.debug('task                    : %s' % task)
    logger.debug('task.depends            : %s' % task.depends)
    logger.debug('task_dependent          : %s' % task_dependent)
    logger.debug('task_dependent.to_node  : %s' % task_dependent.to_node)
    logger.debug('task_dependent.from_node: %s' % task_dependent.from_node)
    logger.debug('initiator               : %s' % initiator)
    task.update_status_with_dependent_statuses()


if __name__ == '__main__':
    taskA = Task(name='TaskA')
    taskB = Task(name='TaskB')
    taskC = Task(name='TaskC', depends=[taskA, taskB])

    assert taskC.status == 'WFD'
    taskC.depends = []
    assert taskC.depends == []
    assert taskC.status == 'RTS'

Outputs:

DEBUG:__main__:inside removed_a_dependency
DEBUG:__main__:task                    : <TaskC (Task)>
DEBUG:__main__:task.depends            : [<TaskA (Task)>, <TaskB (Task)>]
DEBUG:__main__:task_dependent          : <__main__.TaskDependent object at
0x1fa3c10>
DEBUG:__main__:task_dependent.to_node  : <TaskA (Task)>
DEBUG:__main__:task_dependent.from_node: None
DEBUG:__main__:initiator               : <sqlalchemy.orm.attributes.Event
object at 0x1fa3f90>
DEBUG:__main__:inside removed_a_dependency
DEBUG:__main__:task                    : <TaskC (Task)>
DEBUG:__main__:task.depends            : [<TaskA (Task)>, <TaskB (Task)>]
DEBUG:__main__:task_dependent          : <__main__.TaskDependent object at
0x1fa3cd0>
DEBUG:__main__:task_dependent.to_node  : <TaskB (Task)>
DEBUG:__main__:task_dependent.from_node: None
DEBUG:__main__:initiator               : <sqlalchemy.orm.attributes.Event
object at 0x1fa3f90>
Traceback (most recent call last):
  File "/home/eoyilmaz/association_proxy_remove_event.py", line 121, in
<module>
    assert taskC.status == 'RTS'
AssertionError

So, TaskC depends both to TaskA and TaskB. When I set taskC.depends to []
the remove event successfully triggered, and the
Task.update_status_with_dependent_statuses() method is run. And at the end
taskC.depends list is [].

The problem is; even when the second event triggered I still see [TaskA,
TaskB] in task.depends, where it should at least be [TaskB] in second run.
Therefore the update_status_with_dependent_statuses() method always sees
some elements in self.depends and is not able to set the status to RTS.

Is that making sense? Am I missing something in the documentation?

Thanks,

E.Ozgur Yilmaz
eoyilmaz.blogspot.com

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to