I haven't found any topics here that address this, so it may mean that the
answer is so simple that I'm just overthinking here.
Context: I'm the lone developer on a new tech team at a company that's
never had any team in place before. Everything is greenfield. Which is
great because I get to do everything from scratch with no legacy stuff to
account for. But it's also kind of a lot because I *have* to do everything
from scratch.
The ETL pipeline I've prototyped takes a bunch of garbage data from
SalesForce and tries to make it not-garbage and put it back into
SalesForce. We start by normalizing addresses through shipping APIs, then
use the normalized addresses to go to various different data broker APIs
(FirstAmerican, Experian, Whitepages, etc.) to get owner information,
followed by contact information.
I'm using apache airflow as a central automation manager and have it set up
so that DAGs don't do anything more than call methods on a set of objects
inside an "app".
Each API that we deal with is its own object and has methods for dealing
with what's there based on current state. I load all the data that needs to
be processed into postgres so we aren't constantly bumping into the
SalesForce API, and I track state inside postgres with a state table.
Problem: what's the idiomatic way to manage this? I have something that
works, but because this is all new, and our team will be growing soon, I
want to lay down a solid foundation for when we move beyond proof of
concept.
Model examples:
After the raw SalesForce data is loaded into a dumb table with no
restrictions, we load the kickoff table that looks like this:
class AccountIngestedData(Base):
__tablename__ = "account_ingested_data"
__table_args__ = {"schema": "datablender"}
account_ingested_data_id = Column(
BigInteger,
primary_key=True,
server_default=text("nextval('account_ingested_data_id_seq'::regclass)"),
)
salesforce_id = Column(ForeignKey(AccountRawData.id), nullable=False)
account_data_source_id = Column(
ForeignKey(AccountDataSource.account_data_source_id), nullable=False
)
account_enrichment_source_field_mapper_id = Column(
ForeignKey(
AccountEnrichmentSourceFieldMapper.account_enrichment_source_field_mapper_id
),
nullable=False,
)
account_state_id = Column(ForeignKey(AccountState.account_state_id),
nullable=False)
account_experian_file_id = Column(
ForeignKey(AccountExperianFile.account_experian_file_id), nullable=True
)
account_name = Column(Text, nullable=False)
account_address1 = Column(Text, nullable=False)
account_address2 = Column(Text, nullable=False)
account_city = Column(Text, nullable=False)
account_state = Column(Text, nullable=False)
account_zip = Column(Text, nullable=False)
account_data_source = relationship("AccountDataSource")
account_enrichment_source_field_mapper = relationship(
"AccountEnrichmentSourceFieldMapper"
)
account_experian_file = relationship("AccountExperianFile")
account_state = relationship("AccountState")
salesforce = relationship("AccountRawData")
And we have a state table:
class AccountState(Base):
__tablename__ = "account_state"
__table_args__ = {"schema": "datablender"}
account_state_id = Column(
BigInteger,
primary_key=True,
server_default=text("nextval('account_state_id_seq'::regclass)"),
)
account_state_description = Column(Text, nullable=False)
Example values for AccountState are like
100000, "ingested"
100001, "address valid"
100002, "address invalid"
100003, "address error"
100004, "address manual review"
100005, "owner info found"
100006, "owner info ambiguous"
100007, "owner info error"
100008, "owner info manual review"
Of course this list goes on and on as we add more and more integrations. At
each stage of the automated pipeline, there's a query to get the records
that correspond to each state.
My question is this: what's the cleanest way to assign the Foreign Key IDs
to the accounts table based on what's happening?
I feel like there must be a really obvious solution to this that doesn't
involve a global python dictionary that has to be updated every time a new
status is added, and also doesn't have to bang another query on the status
table for every record that gets loaded. But my brain can't figure out how
to make this happen.
Really curious if anyone has thoughts about this pattern. It's obviously
common, I just haven't seen the SQLA implementation before.
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/sqlalchemy/85f4a4f4-38d9-4ff9-8d39-24ca5dbe7424n%40googlegroups.com.