Hello list, My schema has changed, and now I want to retrieve my old data (of the old schema) to the new database (with the new schema) from csv files (I export the tables of the old database to csv files and then load those csv files to the new database with some column mapping and some gap filling for new columns)
The problem is that my new schema have a UniqueConstraint for some tables (like : the name column of the city table should be unique within a country). This constraint was not present in the old schema and data is corrupt (two cities with the same name in the same country). So when I try to insert them in the new database, I have IntegrityErrors. The solution I thought of was : * Catch the IntegrityError exception * If it's a problem on a UniqueConstraint, then the exception was raised because I tried to insert instance B that has the same "key" as instance A that was inserted before. * So for all children of B (B's relations), set their parent to A. For example, for all citizens of B, set their city to A, beause A and B ought to be the same. * Then, safely ignore B and move on to the next instance. Here's what has been done so far (that works, I just use psuedo code for "illustration" purpose. If necessary, you can look at the actual attached source files): line = csvloader.next_row() ModelClass = get_current_model() instance = ModelClass.create_instance(**(to_dict(line))) session.add(instance) I wish I could do something like this : try: session.commit() except IntegrityError,e : session.rollback() errror = get_error() if type_of(error) == UniqueConstraintError : original_instance = ModelClass.get(instance.id) for relation in instance.get_relations() : # Is this correct ? instance.relation.inverse = original_instance session.commit() My questions are : how to write get_error, type_of, where to get UniqueContraintError, how to write get_relations, how to set the inverse of a relation (is instance.realtion.inverse the right thing to set ?) and is this approach correct ? -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalch...@googlegroups.com. To unsubscribe from this group, send email to sqlalchemy+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en.
#!/usr/bin/env python from coriolis.client.backend.db import DatabaseConnection from coriolis.models import * # heh import sys dbcnx = DatabaseConnection(env="test") mapping = {"print_id":"medium_id", "categ_id":"category_id", "city_id":"daira_id", "side_a":"face_a_id", "side_b":"face_b_id" } def load_all(): dbcnx.init(reset=True,init_data=True) load_list = "wilaya,daira,commune,street_furniture,media_type,print_type,face_type,lesser,convention,site," load_list += "convention_installment,convention_return,media,face" load_list = load_list.split(",") for to_load in load_list : dbcnx.load_data(to_load,mapping) StreetFurniture.get_by(name="PANNEAUX").remove() print "The self checking shall begin. Would you like to skip this operation [y/n] ?" if sys.stdin.readline().strip() in "y,Y".split(","): return print "This is your last chance. Press any key to start or ctrl-c to abort (data has already been commited, don't worry about that)." sys.stdin.readline() for klass in Lesser, Convention, ConventionInstallment, Site, Media: print "self cheking the elements of",klass.__name__ for instance in klass.query.all(): print "cheking",repr(instance) instance.self_check() def load_one(data): dbcnx.init() dbcnx.load_data(data,mapping) if len(sys.argv) > 1: load_one(sys.argv[1]) else: load_all() dbcnx.commit()
# Coriolis from coriolis.client.utils.config import AppConfig from coriolis import models from coriolis.models import * from coriolis.client.utils import Singleton,get_csv,get_logger, db_available from coriolis.client.core.database import BaseDB # ORM import elixir # Python from csv import reader as csvreader class DatabaseConnection(BaseDB,object): __metaclass__ = Singleton session = elixir.session def __init__(self,config=None,env="test",reset=False,init_data=False): """ env should be one of : default/prod, test, user This will match self.config.default_conf, self.config.user_conf, self.config.test_conf etc. """ self.config = config if not config : app_config = AppConfig() self.config = getattr(app_config,env+"_conf") elixir.metadata.bind = self.config.backend.database.url self.session.bind = elixir.metadata.bind self.initialized = False self.mapped = False # boolean print "working on ",elixir.metadata.bind ## This breaks coriolis at startup if the databases are not available ## IT SHOULD NOT HAPPEN BEFORE THE FIRST VIEW IS CALLED!!!! ## AND IT SHOULD GIVE FEEDBACK (using utils.alerts.Alerts methods) ## See coriolis/client/core/gtkui/ui.py:L166 # if reset: # self.reset(init_data) # self.map_models(True) # self.commit() def init(self,reset=False,init_data=False): if reset: self.reset(init_data) self.map_models(True) self.commit() def reset(self,init_data=False): self.empty() if init_data: self.init_data() @db_available def map_models(self,create=False): if self.mapped : return # When set to True, the create parameter creates tables if they do not exist # OBJECTION: creating tables on the fly is NOT Coriolis's job, it's the developer's. # Move this code to test elixir.setup_all(create) self.mapped = True print "map_models ok." return True def get_last_error_message(self): """ Methods decorated with the db_available will store the last_error_message variable as an attribute, so that it can be accessed later. See a usage example in DatabaseConneciton::load_from_csv """ # This is set by the db_available decorator if an error is raised. return self.last_error_message @db_available def commit(self): #~ print "DIRTY OBJECTS BEFORE COMMIT ARE",self.session.dirty self.session.commit() if hasattr(self, "manager"): self.manager.frame.toggle_save(False) self.manager.frame.toggle_check(True) # Leave this alone, it resolves bug 159 #return "OK" # Leave result to None, it resolves bug 159 def flush(self, result=None): print "FLUSHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH" result = self.do_flush(result) if result == "integrity_error": self.rollback() return result if hasattr(self, "manager"): self.manager.frame.toggle_save(True) return result @db_available def do_flush(self,result): """ """ self.session.flush() return result def fallback(self, *args, **kwargs): # (yassine) can't understand ancient vietnamese ... # only fallback is we are getting data, NOT editing elixir.metadata.bind = self.config.backend.database.fallback_url #~ print "falling back to ",elixir.metadata.bind # should we rerun a map_model here? @db_available def rollback(self): self.session.rollback() if hasattr(self, "manager"): self.provide_refresh() self.manager.frame.toggle_save(False) # leave this alone, it resolves bug 159 #return "OK" def provide_refresh(self): """ This method should flush the database and tell the active view to refill itself; obviously. """ self.flush() d = self.manager.active_component.active_mscontroller.top_controller.provide_data() return d def empty(self): """ This is useful for tests when you insert unique keys multiple times. """ if not self.mapped : self.map_models() elixir.drop_all() # Tell people that the database has gone self.mapped = False def init_data(self): """ Populates some constant tables in the database like *_type tables, sanity etc. """ # Did we empty the database just before ? if not self.mapped : # Re-create the database self.map_models(True) # import a long list of objects import initdata self.commit() def add(self,obj): # FIXME: # is there a difference between self.session and elixir.session? # if not, why the jumping around? making the code confusing. elixir.session.add(obj) def get_session(self): return self.session def initialize(self): """ """ self.initialized = True def load_data(self,name,mapping): """ """ parts = name.split("_") csv_file = get_csv(name) class_name = "".join([part.capitalize() for part in parts]) print "name",name print "class_name",class_name klass = getattr(models,class_name) fd = file("integrity.log","a") fd.write("Report for %s" % csv_file) fd.write("*"*72) self.load_from_csv(csv_file,klass,mapping,force_id=True) def load_from_csv(self,filename,model_class,colnames_mapping={},force_id=True): """ """ fd = file("integrity.log","a") #logger = get_logger("error.log") data = csvreader(file(filename)) header = data.next() model_class.force_id = force_id tablename = model_class.mapper.mapped_table.name sequence_name = tablename+"_id_seq" last_id = 0 for line in data: data_dict = self.fix_data(to_dict(header,line,colnames_mapping)) id_int = int(data_dict.get("id")) if last_id < id_int: last_id = id_int inst = model_class(**data_dict) error = self.commit() if error == "integrity_error": error_msg = self.get_last_error_message() fd.write("id: %s"%id_int) # logger.error("error : (%s)" % error_msg) # logger.error("id : (%s)" % id_int) # logger.error("file : (%s)" % filename) # # logger.error("line : (%s)" % line) # # logger.error("data : (%s)" % data_dict) self.rollback() # try: # # flush dosen't really commit, and a rollback will remove all prior VALID instances too # # so we prefer to commit to keep those valid instances # self.commit() # except Exception,e: # print "Exception ???",e # logger.error(str(e)) # logger.error("file : (%s), class : (%s), id (%s)" % (filename,model_class,data_dict.get("id"))) # logger.error("data : (%s)" % data_dict) # self.rollback() last_id +=1 # for safety print "restarting", sequence_name, "at", last_id #fix_seq = DDL('ALTER SEQUENCE {0} RESTART WITH {1}'.format(sequence_name, last_id), bind=elixir.metadata.bind) fix_seq = 'ALTER SEQUENCE {0} RESTART WITH {1}'.format(sequence_name, last_id) self.session.execute(fix_seq) def fix_data(self,data_dict): """ """ self.fix_agency(data_dict) self.fix_user(data_dict) return data_dict def fix_agency(self,data_dict): """ """ agency = data_dict.get("agency") if agency in (None,"") : data_dict["agency"] = "alger" def fix_user(self,data_dict): """ """ user = data_dict.get("user") if user in (None,"") : data_dict["user"] = "coriolis" def to_dict(header,line,colnames_mapping={}): """ Helper function used by DatabaseConection::load_from_csv """ return dict([(colnames_mapping.get(colname,colname),value != "\N" and value or None) for (colname, value) in zip(header,line)])