I haven't found any topics here that address this, so it may mean that the 
answer is so simple that I'm just overthinking here.

Context: I'm the lone developer on a new tech team at a company that's 
never had any team in place before. Everything is greenfield. Which is 
great because I get to do everything from scratch with no legacy stuff to 
account for. But it's also kind of a lot because I *have* to do everything 
from scratch.

The ETL pipeline I've prototyped takes a bunch of garbage data from 
SalesForce and tries to make it not-garbage and put it back into 
SalesForce. We start by normalizing addresses through shipping APIs, then 
use the normalized addresses to go to various different data broker APIs 
(FirstAmerican, Experian, Whitepages, etc.) to get owner information, 
followed by contact information.

I'm using apache airflow as a central automation manager and have it set up 
so that DAGs don't do anything more than call methods on a set of objects 
inside an "app".

Each API that we deal with is its own object and has methods for dealing 
with what's there based on current state. I load all the data that needs to 
be processed into postgres so we aren't constantly bumping into the 
SalesForce API, and I track state inside postgres with a state table.

Problem: what's the idiomatic way to manage this? I have something that 
works, but because this is all new, and our team will be growing soon, I 
want to lay down a solid foundation for when we move beyond proof of 
concept.

Model examples:

After the raw SalesForce data is loaded into a dumb table with no 
restrictions, we load the kickoff table that looks like this:

class AccountIngestedData(Base):
__tablename__ = "account_ingested_data"
__table_args__ = {"schema": "datablender"}

account_ingested_data_id = Column(
BigInteger,
primary_key=True,
server_default=text("nextval('account_ingested_data_id_seq'::regclass)"),
)
salesforce_id = Column(ForeignKey(AccountRawData.id), nullable=False)
account_data_source_id = Column(
ForeignKey(AccountDataSource.account_data_source_id), nullable=False
)
account_enrichment_source_field_mapper_id = Column(
ForeignKey(
AccountEnrichmentSourceFieldMapper.account_enrichment_source_field_mapper_id
),
nullable=False,
)
account_state_id = Column(ForeignKey(AccountState.account_state_id), 
nullable=False)
account_experian_file_id = Column(
ForeignKey(AccountExperianFile.account_experian_file_id), nullable=True
)

account_name = Column(Text, nullable=False)
account_address1 = Column(Text, nullable=False)
account_address2 = Column(Text, nullable=False)
account_city = Column(Text, nullable=False)
account_state = Column(Text, nullable=False)
account_zip = Column(Text, nullable=False)

account_data_source = relationship("AccountDataSource")
account_enrichment_source_field_mapper = relationship(
"AccountEnrichmentSourceFieldMapper"
)
account_experian_file = relationship("AccountExperianFile")
account_state = relationship("AccountState")
salesforce = relationship("AccountRawData")

And we have a state table:

class AccountState(Base):
__tablename__ = "account_state"
__table_args__ = {"schema": "datablender"}

account_state_id = Column(
BigInteger,
primary_key=True,
server_default=text("nextval('account_state_id_seq'::regclass)"),
)
account_state_description = Column(Text, nullable=False)

Example values for AccountState are like

100000, "ingested"
100001, "address valid"
100002, "address invalid"
100003, "address error"
100004, "address manual review"
100005, "owner info found"
100006, "owner info ambiguous"
100007, "owner info error"
100008, "owner info manual review"

Of course this list goes on and on as we add more and more integrations. At 
each stage of the automated pipeline, there's a query to get the records 
that correspond to each state.


My question is this: what's the cleanest way to assign the Foreign Key IDs 
to the accounts table based on what's happening?

I feel like there must be a really obvious solution to this that doesn't 
involve a global python dictionary that has to be updated every time a new 
status is added, and also doesn't have to bang another query on the status 
table for every record that gets loaded. But my brain can't figure out how 
to make this happen.

Really curious if anyone has thoughts about this pattern. It's obviously 
common, I just haven't seen the SQLA implementation before.


-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/sqlalchemy/85f4a4f4-38d9-4ff9-8d39-24ca5dbe7424n%40googlegroups.com.

Reply via email to