On Sep 23, 2013, at 8:13 AM, Piotr Deszyński <deszyn...@red-sky.pl> wrote:

> I'm tryign to do a bulk update using sqlalchemy ShardedSession. I've stumbled 
> on a problem when i have to do a bulk update without selecting rows from a 
> database. Here's how I've tried to do it:
> 
> class Row(Base, object): #pylint: disable=I0011, R0903
>     '''
>     Row
>     '''
> 
>     __tablename__ = 'my_table'
> 
>     some_primary = Column(Integer(11), primary_key=True)
>     sharded_attribute = Column(String())
>     updated_attribute = Column(Integer(2))
> 
> Session = scoped_session(sessionmaker(class_=ShardedSession))
> Session.configure(
>     query_chooser=lambda x: SHARDS_LIST,
>     id_chooser=lambda x, y: SHARDS_LIST,
>     shard_chooser=my_shard_chooser,
>     autocommit=False
> )
> Then I'm making a query as follows:
> 
> session.query(Row) \
>     .filter_by(Row.sharded_attribute.in_(attributes_list)) \
>     .update({Row.updated_attribute: 1}, synchronize_session=False)

the current implementation for ShardedQuery doesn't have support for the 
update() or delete() methods.  Those methods were added to Query long after 
ShardedQuery was developed, and implementing them transparently is slightly 
non-trivial (essentially has to emit the UPDATE or DELETE across all shards 
that apply to the query_chooser, in the same way that _execute_and_instances 
does).

> So the problem is, that sharded_attribute can be a list of rows which are in 
> different shards. Using such a query SQLAlchemy calls my_shard_chooser. In 
> this function I'm determining a shard or a shard list depending on the 
> params. There's a problem during the bulk update thought, becauseinstance 
> param is None, so I don't have value of sharded_attribute. In that case 
> there's passed clause attribute, so using it I'm able to determine what 
> values of sharded_attribute were provided. Sadly these items might be in 
> different shards and I cannot return a list of shards ids (only single id is 
> supported).
> 
> Does anybody know how I can change my approach to make it working? Do I have 
> to resign from using a session? Is there at all a way to do it other than 
> doing firstly select for all updated rows and then updating objects and 
> calling commit or making direct calls on engine's execute?
> 

If I'm understanding correctly this is all due to update()/delete() not being 
implemented.    Looking at how these work right now, there's not a 
straightforward path to override how they execute queries the way 
_execute_and_instances() does, so you might be better off implementing your own 
update()/delete() that's similar to how ShardedQuery._execute_and_instances 
works, but uses core table.update() and table.delete() constructs.

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to