Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Mich Talebzadeh
Many thanks Marco.

Points noted and other points/criticism are equally welcome. In a forum
like this we do  not disagree, we just agree to differ so to speak and
share ideas.

I will review my code and take onboard your suggestions.

regards,

Mich




LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 9 Feb 2021 at 18:09, Sofia’s World  wrote:

> Hey Mich
>  my 2 cents on top of Jerry's.
> for reusable @fixtures across your tests, i'd leverage conftest.py and put
> all of them there  -if number is not too big. OW. as you say, you can
> create  tests\fixtures where you place all of them there
>
> in term of extractHiveDAta for a @fixture it is doing too much
> A fixture in pytest - anyone correct if wrong - its just an object you can
> reuse across tests, something like this below. it should contain very
> minimal code.. I'd say not more than 3 lines..
>
> @fixture
> def spark():
>  return SparkSession()
>
> def test_mydataframe(spark):
>mydf = spark.table("mypreferredtable")
>
> It seems to me your extractHiveDAta is doing too much.
> IMHO it should be:
>
> @pytest.fixture
> def hive_extractor():
>  return 
>
> @pytext.fixture
> def default_config():
>  return 
>
> def test_extraction_from_hive(spark, hive_extractor, default_config):
>   tableName = config['GCPVariables']['sourceTable']
>fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
> tableName
>house_df = s.loadTableFromHiveJDBC(spark_session,
> fullyQualifiedTableName)
># To test your dataframe, do something like this
>test_df_pandas  =  .from_csv("""regionName,col2,col3
> Kensington and
> chelsea,Value2,Value3""")
>test_df = spark.createDataFrame(test_df_pandas)
>result_df = house_df.subtract(test_df)
>self.assertEquals(0, result_df.count())
>
> as always, pls feel free to disagree havent done much on pytest/
> fixtures but this is how i'd restructure..
>
> hth
>  Marco
>
>
>
> On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh 
> wrote:
>
>> Interesting points Jerry. I do not know how much atomising the unit test
>> brings benefit.
>>
>> For example we have
>>
>> @pytest.fixture(scope = "session")
>> def extractHiveData():
>> # read data through jdbc from Hive
>> spark_session = s.spark_session(ctest['common']['appName'])
>> tableName = config['GCPVariables']['sourceTable']
>> fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
>> tableName
>>house_df = s.loadTableFromHiveJDBC(spark_session,
>> fullyQualifiedTableName)
>> # sample data selected equally n rows from Kensington and Chelsea and
>> n rows from City of Westminster
>> num_rows = int(ctest['statics']['read_df_rows']/2)
>> house_df = house_df.filter(col("regionname") == "Kensington and
>> Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
>> "City of Westminster").limit(num_rows))
>> return house_df
>>
>> Notes:
>>
>> That spark_session is imported from a packaged and has been tested many
>> times
>>
>> The config static values are read through a python file config.py in turn
>> reading a yml file config.yml
>>
>> The important ones to test is house_df, the data frame to read from the
>> Hive table. That can fail for a variety of reasons.
>>
>>
>>1. The Hive driver used is old or out of date
>>2. The Hive driver does not support kerberized access that may be the
>>case in production
>>
>> So any unit testing is going to be limited by scope. Also another point
>> being is that if the extract data module fails then you are going to know
>> that by calling it and probably can be rectified pretty quick. It is always
>> the issue of coverage. How much testing needs to be covered.
>>
>>
>> HTH
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov 
>> wrote:
>>
>>> Sure, I think it makes sense in many cases to break things up like this.
>>> Looking at your other example I'd say that you 

Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Sofia’s World
Hey Mich
 my 2 cents on top of Jerry's.
for reusable @fixtures across your tests, i'd leverage conftest.py and put
all of them there  -if number is not too big. OW. as you say, you can
create  tests\fixtures where you place all of them there

in term of extractHiveDAta for a @fixture it is doing too much
A fixture in pytest - anyone correct if wrong - its just an object you can
reuse across tests, something like this below. it should contain very
minimal code.. I'd say not more than 3 lines..

@fixture
def spark():
 return SparkSession()

def test_mydataframe(spark):
   mydf = spark.table("mypreferredtable")

It seems to me your extractHiveDAta is doing too much.
IMHO it should be:

@pytest.fixture
def hive_extractor():
 return 

@pytext.fixture
def default_config():
 return 

def test_extraction_from_hive(spark, hive_extractor, default_config):
  tableName = config['GCPVariables']['sourceTable']
   fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
tableName
   house_df = s.loadTableFromHiveJDBC(spark_session,
fullyQualifiedTableName)
   # To test your dataframe, do something like this
   test_df_pandas  =  .from_csv("""regionName,col2,col3
Kensington and chelsea,Value2,Value3""")
   test_df = spark.createDataFrame(test_df_pandas)
   result_df = house_df.subtract(test_df)
   self.assertEquals(0, result_df.count())

as always, pls feel free to disagree havent done much on pytest/
fixtures but this is how i'd restructure..

hth
 Marco



On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh 
wrote:

> Interesting points Jerry. I do not know how much atomising the unit test
> brings benefit.
>
> For example we have
>
> @pytest.fixture(scope = "session")
> def extractHiveData():
> # read data through jdbc from Hive
> spark_session = s.spark_session(ctest['common']['appName'])
> tableName = config['GCPVariables']['sourceTable']
> fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
> tableName
>house_df = s.loadTableFromHiveJDBC(spark_session,
> fullyQualifiedTableName)
> # sample data selected equally n rows from Kensington and Chelsea and
> n rows from City of Westminster
> num_rows = int(ctest['statics']['read_df_rows']/2)
> house_df = house_df.filter(col("regionname") == "Kensington and
> Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
> "City of Westminster").limit(num_rows))
> return house_df
>
> Notes:
>
> That spark_session is imported from a packaged and has been tested many
> times
>
> The config static values are read through a python file config.py in turn
> reading a yml file config.yml
>
> The important ones to test is house_df, the data frame to read from the
> Hive table. That can fail for a variety of reasons.
>
>
>1. The Hive driver used is old or out of date
>2. The Hive driver does not support kerberized access that may be the
>case in production
>
> So any unit testing is going to be limited by scope. Also another point
> being is that if the extract data module fails then you are going to know
> that by calling it and probably can be rectified pretty quick. It is always
> the issue of coverage. How much testing needs to be covered.
>
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov 
> wrote:
>
>> Sure, I think it makes sense in many cases to break things up like this.
>> Looking at your other example I'd say that you might want to break up
>> extractHiveData into several fixtures (one for session, one for config, one
>> for the df) because in my experience fixtures like those are reused
>> constantly across a test suite. In general I try to keep my fixtures to one
>> concrete task only, so that if I find myself repeating a pattern I just
>> factor it out into another fixture.
>>
>> On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Jerry for your comments.
>>>
>>> The easiest option and I concur is to have all these fixture files
>>> currently under fixtures package lumped together in conftest.py under
>>> * tests* package.
>>>
>>> Then you can get away all together from fixtures and it works. However,
>>> I gather plug and play becomes less manageable when you have a large number
>>> of fixtures (large being relative here). My main modules (not tests) are
>>> designed to do ETL from any database that supports JDBC connections (bar
>>> 

Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Mich Talebzadeh
Interesting points Jerry. I do not know how much atomising the unit test
brings benefit.

For example we have

@pytest.fixture(scope = "session")
def extractHiveData():
# read data through jdbc from Hive
spark_session = s.spark_session(ctest['common']['appName'])
tableName = config['GCPVariables']['sourceTable']
fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
tableName
   house_df = s.loadTableFromHiveJDBC(spark_session,
fullyQualifiedTableName)
# sample data selected equally n rows from Kensington and Chelsea and n
rows from City of Westminster
num_rows = int(ctest['statics']['read_df_rows']/2)
house_df = house_df.filter(col("regionname") == "Kensington and
Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
"City of Westminster").limit(num_rows))
return house_df

Notes:

That spark_session is imported from a packaged and has been tested many
times

The config static values are read through a python file config.py in turn
reading a yml file config.yml

The important ones to test is house_df, the data frame to read from the
Hive table. That can fail for a variety of reasons.


   1. The Hive driver used is old or out of date
   2. The Hive driver does not support kerberized access that may be the
   case in production

So any unit testing is going to be limited by scope. Also another point
being is that if the extract data module fails then you are going to know
that by calling it and probably can be rectified pretty quick. It is always
the issue of coverage. How much testing needs to be covered.


HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov  wrote:

> Sure, I think it makes sense in many cases to break things up like this.
> Looking at your other example I'd say that you might want to break up
> extractHiveData into several fixtures (one for session, one for config, one
> for the df) because in my experience fixtures like those are reused
> constantly across a test suite. In general I try to keep my fixtures to one
> concrete task only, so that if I find myself repeating a pattern I just
> factor it out into another fixture.
>
> On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh 
> wrote:
>
>> Thanks Jerry for your comments.
>>
>> The easiest option and I concur is to have all these fixture files
>> currently under fixtures package lumped together in conftest.py under
>> * tests* package.
>>
>> Then you can get away all together from fixtures and it works. However, I
>> gather plug and play becomes less manageable when you have a large number
>> of fixtures (large being relative here). My main modules (not tests) are
>> designed to do ETL from any database that supports JDBC connections (bar
>> Google BigQuery that only works correctly with Spark API). You specify your
>> source DB and target DB in yml file for any pluggable JDBC database
>>
>> Going back to Pytest, please  check this reference below for the reason
>> for fixtures packaging
>>
>> How to modularize your py.test fixtures (github.com)
>> 
>>
>> With regard to your other point on fixtures (a fixture in each file), I
>> have this fixture *loadIntoMysqlTable() *where it uses the data frame
>> created in* extractHiveData*, reads sample records from Hive and
>> populates MySql test table. The input needed is the Dataframe that is
>> constructed in the fixture module extractHiveData which has been passed as
>> parameter to this. This is the only way it seems to work through my tests
>>
>>
>> @pytest.fixture(scope = "session")
>> def extractHiveData():
>> # read data through jdbc from Hive
>> spark_session = s.spark_session(ctest['common']['appName'])
>> tableName = config['GCPVariables']['sourceTable']
>> fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
>> tableName
>>house_df = s.loadTableFromHiveJDBC(spark_session,
>> fullyQualifiedTableName)
>> # sample data selected equally n rows from Kensington and Chelsea and
>> n rows from City of Westminster
>> num_rows = int(ctest['statics']['read_df_rows']/2)
>> house_df = house_df.filter(col("regionname") == "Kensington and
>> Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
>> "City of Westminster").limit(num_rows))
>> return house_df
>>
>> @pytest.fixture(scope = "session")
>> def loadIntoMysqlTable(*extractHiveData*):
>> try:
>> *extractHiveData*. \
>>

Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Jerry Vinokurov
Sure, I think it makes sense in many cases to break things up like this.
Looking at your other example I'd say that you might want to break up
extractHiveData into several fixtures (one for session, one for config, one
for the df) because in my experience fixtures like those are reused
constantly across a test suite. In general I try to keep my fixtures to one
concrete task only, so that if I find myself repeating a pattern I just
factor it out into another fixture.

On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh 
wrote:

> Thanks Jerry for your comments.
>
> The easiest option and I concur is to have all these fixture files
> currently under fixtures package lumped together in conftest.py under
> * tests* package.
>
> Then you can get away all together from fixtures and it works. However, I
> gather plug and play becomes less manageable when you have a large number
> of fixtures (large being relative here). My main modules (not tests) are
> designed to do ETL from any database that supports JDBC connections (bar
> Google BigQuery that only works correctly with Spark API). You specify your
> source DB and target DB in yml file for any pluggable JDBC database
>
> Going back to Pytest, please  check this reference below for the reason
> for fixtures packaging
>
> How to modularize your py.test fixtures (github.com)
> 
>
> With regard to your other point on fixtures (a fixture in each file), I
> have this fixture *loadIntoMysqlTable() *where it uses the data frame
> created in* extractHiveData*, reads sample records from Hive and
> populates MySql test table. The input needed is the Dataframe that is
> constructed in the fixture module extractHiveData which has been passed as
> parameter to this. This is the only way it seems to work through my tests
>
>
> @pytest.fixture(scope = "session")
> def extractHiveData():
> # read data through jdbc from Hive
> spark_session = s.spark_session(ctest['common']['appName'])
> tableName = config['GCPVariables']['sourceTable']
> fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
> tableName
>house_df = s.loadTableFromHiveJDBC(spark_session,
> fullyQualifiedTableName)
> # sample data selected equally n rows from Kensington and Chelsea and
> n rows from City of Westminster
> num_rows = int(ctest['statics']['read_df_rows']/2)
> house_df = house_df.filter(col("regionname") == "Kensington and
> Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
> "City of Westminster").limit(num_rows))
> return house_df
>
> @pytest.fixture(scope = "session")
> def loadIntoMysqlTable(*extractHiveData*):
> try:
> *extractHiveData*. \
> write. \
> format("jdbc"). \
> option("url", test_url). \
> option("dbtable", ctest['statics']['sourceTable']). \
> option("user", ctest['statics']['user']). \
> option("password", ctest['statics']['password']). \
> option("driver", ctest['statics']['driver']). \
> mode(ctest['statics']['mode']). \
> save()
> return True
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
> Thanks again.
>
>
> Mich
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov 
> wrote:
>
>> Hi Mich,
>>
>> I'm a bit confused by what you mean when you say that you cannot call a
>> fixture in another fixture. The fixtures resolve dependencies among
>> themselves by means of their named parameters. So that means that if I have
>> a fixture
>>
>> @pytest.fixture
>> def fixture1():
>> return SomeObj()
>>
>> and another fixture
>>
>> @pytest.fixture
>> def fixture2(fixture1)
>> return do_something_with_obj(fixture1)
>>
>> my second fixture will simply receive the object created by the first. As
>> such, you do not need to "call" the second fixture at all. Of course, if
>> you had some use case where you were constructing an object in the second
>> fixture, you could have the first return a class, or you could have it
>> return a function. In fact, I have fixtures in a project that do both. Here
>> they are:
>>
>> @pytest.fixture
>> def func():
>>
>> def foo(x, y, z):
>>
>> return (x + y) * z
>>
>> return foo
>>
>> That's a fixture that returns a function, and any test using the func
>> fixture would receive that actual function as a value, 

Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Mich Talebzadeh
Thanks Jerry for your comments.

The easiest option and I concur is to have all these fixture files
currently under fixtures package lumped together in conftest.py under
* tests* package.

Then you can get away all together from fixtures and it works. However, I
gather plug and play becomes less manageable when you have a large number
of fixtures (large being relative here). My main modules (not tests) are
designed to do ETL from any database that supports JDBC connections (bar
Google BigQuery that only works correctly with Spark API). You specify your
source DB and target DB in yml file for any pluggable JDBC database

Going back to Pytest, please  check this reference below for the reason for
fixtures packaging

How to modularize your py.test fixtures (github.com)


With regard to your other point on fixtures (a fixture in each file), I
have this fixture *loadIntoMysqlTable() *where it uses the data frame
created in* extractHiveData*, reads sample records from Hive and populates
MySql test table. The input needed is the Dataframe that is constructed in
the fixture module extractHiveData which has been passed as parameter to
this. This is the only way it seems to work through my tests


@pytest.fixture(scope = "session")
def extractHiveData():
# read data through jdbc from Hive
spark_session = s.spark_session(ctest['common']['appName'])
tableName = config['GCPVariables']['sourceTable']
fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
tableName
   house_df = s.loadTableFromHiveJDBC(spark_session,
fullyQualifiedTableName)
# sample data selected equally n rows from Kensington and Chelsea and n
rows from City of Westminster
num_rows = int(ctest['statics']['read_df_rows']/2)
house_df = house_df.filter(col("regionname") == "Kensington and
Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
"City of Westminster").limit(num_rows))
return house_df

@pytest.fixture(scope = "session")
def loadIntoMysqlTable(*extractHiveData*):
try:
*extractHiveData*. \
write. \
format("jdbc"). \
option("url", test_url). \
option("dbtable", ctest['statics']['sourceTable']). \
option("user", ctest['statics']['user']). \
option("password", ctest['statics']['password']). \
option("driver", ctest['statics']['driver']). \
mode(ctest['statics']['mode']). \
save()
return True
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

Thanks again.


Mich


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 9 Feb 2021 at 15:47, Jerry Vinokurov  wrote:

> Hi Mich,
>
> I'm a bit confused by what you mean when you say that you cannot call a
> fixture in another fixture. The fixtures resolve dependencies among
> themselves by means of their named parameters. So that means that if I have
> a fixture
>
> @pytest.fixture
> def fixture1():
> return SomeObj()
>
> and another fixture
>
> @pytest.fixture
> def fixture2(fixture1)
> return do_something_with_obj(fixture1)
>
> my second fixture will simply receive the object created by the first. As
> such, you do not need to "call" the second fixture at all. Of course, if
> you had some use case where you were constructing an object in the second
> fixture, you could have the first return a class, or you could have it
> return a function. In fact, I have fixtures in a project that do both. Here
> they are:
>
> @pytest.fixture
> def func():
>
> def foo(x, y, z):
>
> return (x + y) * z
>
> return foo
>
> That's a fixture that returns a function, and any test using the func
> fixture would receive that actual function as a value, which could then be
> invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more
> like what you're doing:
>
>
> @pytest.fixture
> def data_frame():
>
> return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)], columns=['x', 
> 'y', 'z'])
>
> This one just returns a data frame that can be operated on.
>
> Looking at your setup, I don't want to say that it's wrong per se (it
> could be very appropriate to your specific project to split things up among
> these many files) but I would say that it's not idiomatic usage of pytest
> fixtures, in my experience. It feels to me like you're jumping through a
> lot of hoops to set up something that could be done quite easily and
> compactly in 

Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Jerry Vinokurov
Hi Mich,

I'm a bit confused by what you mean when you say that you cannot call a
fixture in another fixture. The fixtures resolve dependencies among
themselves by means of their named parameters. So that means that if I have
a fixture

@pytest.fixture
def fixture1():
return SomeObj()

and another fixture

@pytest.fixture
def fixture2(fixture1)
return do_something_with_obj(fixture1)

my second fixture will simply receive the object created by the first. As
such, you do not need to "call" the second fixture at all. Of course, if
you had some use case where you were constructing an object in the second
fixture, you could have the first return a class, or you could have it
return a function. In fact, I have fixtures in a project that do both. Here
they are:

@pytest.fixture
def func():

def foo(x, y, z):

return (x + y) * z

return foo

That's a fixture that returns a function, and any test using the func
fixture would receive that actual function as a value, which could then be
invoked by calling e.g. func(1, 2, 3). Here's another fixture that's more
like what you're doing:


@pytest.fixture
def data_frame():

return pd.DataFrame.from_records([(1, 2, 3), (4, 5, 6)],
columns=['x', 'y', 'z'])

This one just returns a data frame that can be operated on.

Looking at your setup, I don't want to say that it's wrong per se (it could
be very appropriate to your specific project to split things up among these
many files) but I would say that it's not idiomatic usage of pytest
fixtures, in my experience. It feels to me like you're jumping through a
lot of hoops to set up something that could be done quite easily and
compactly in conftest.py. I do want to emphasize that there is no
limitation on how fixtures can be used within functions or within other
fixtures (which are also just functions), since the result of the fixture
call is just some Python object.

Hope this helps,
Jerry

On Tue, Feb 9, 2021 at 10:18 AM Mich Talebzadeh 
wrote:

> I was a bit confused with the use of fixtures in Pytest with the
> dataframes passed as an input pipeline from one fixture to another. I wrote
> this after spending some time on it. As usual it is heuristic rather than
> anything overtly by the book so to speak.
>
> In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle
> to Hive etc. However, for PyTest, I decided to use MySql as a database of
> choice for testing with a small sample of data (200 rows). I mentioned
> Fixtures. Simply put "Fixtures are* functions, which will run before each
> test function to which it is applied, to prepare data*. Fixtures are used
> to feed some data to the tests such as database connections". If you have
> ordering like Read data (Extract), do something with it( Transform) and
> save it somewhere (Load), using Spark then these are all happening in
> memory with data frames feeding each other.
>
> The crucial thing to remember is that fixtures pass functions to each
> other as parameters not by invoking them directly!
>
> Example  ## This is correct @pytest.fixture(scope = "session") def
> transformData(readSourceData):  ## fixture passed as parameter # this is
> incorrect (cannot call a fixture in another fixture) read_df =
> readSourceData()  So This operation becomes
>
>  transformation_df = readSourceData. \ select( \ 
>
> Say in PyCharm under tests package, you create a package "fixtures" (just
> a name nothing to do with "fixture") and in there you put your ETL python
> modules that prepare data for you. Example
>
> ### file --> saveData.py @pytest.fixture(scope = "session") def
> saveData(transformData): # Write to test target table try: transformData. \
> write. \ format("jdbc"). \ 
>
>
> You then drive this test by creating a file called *conftest.py *under*
> tests* package. You can then instantiate  your fixture files by
> referencing them in this file as below
>
> import pytest from tests.fixtures.extractHiveData import extractHiveData
> from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from
> tests.fixtures.readSavedData import readSavedData from
> tests.fixtures.readSourceData import readSourceData from
> tests.fixtures.transformData import transformData from
> tests.fixtures.saveData import saveData from tests.fixtures.readSavedData
> import readSavedData
>
> Then you have your test Python file say *test_oracle.py* under package
> tests and then put assertions there
>
> import pytest from src.config import ctest
> @pytest.mark.usefixtures("extractHiveData") def
> test_extract(extractHiveData): assert extractHiveData.count() > 0
> @pytest.mark.usefixtures("loadIntoMysqlTable") def
> test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable
> @pytest.mark.usefixtures("readSavedData") def
> test_readSourceData(readSourceData): assert readSourceData.count() ==
> ctest['statics']['read_df_rows']
> @pytest.mark.usefixtures("transformData") def
> test_transformData(transformData): assert transformData.count() 

Testing ETL with Spark using Pytest

2021-02-09 Thread Mich Talebzadeh
I was a bit confused with the use of fixtures in Pytest with the dataframes
passed as an input pipeline from one fixture to another. I wrote this after
spending some time on it. As usual it is heuristic rather than anything
overtly by the book so to speak.

In PySpark and PyCharm you can ETTL from Hive to BigQuery or from Oracle to
Hive etc. However, for PyTest, I decided to use MySql as a database of
choice for testing with a small sample of data (200 rows). I mentioned
Fixtures. Simply put "Fixtures are* functions, which will run before each
test function to which it is applied, to prepare data*. Fixtures are used
to feed some data to the tests such as database connections". If you have
ordering like Read data (Extract), do something with it( Transform) and
save it somewhere (Load), using Spark then these are all happening in
memory with data frames feeding each other.

The crucial thing to remember is that fixtures pass functions to each other
as parameters not by invoking them directly!

Example  ## This is correct @pytest.fixture(scope = "session") def
transformData(readSourceData):  ## fixture passed as parameter # this is
incorrect (cannot call a fixture in another fixture) read_df =
readSourceData()  So This operation becomes

 transformation_df = readSourceData. \ select( \ 

Say in PyCharm under tests package, you create a package "fixtures" (just a
name nothing to do with "fixture") and in there you put your ETL python
modules that prepare data for you. Example

### file --> saveData.py @pytest.fixture(scope = "session") def
saveData(transformData): # Write to test target table try: transformData. \
write. \ format("jdbc"). \ 


You then drive this test by creating a file called *conftest.py *under*
tests* package. You can then instantiate  your fixture files by referencing
them in this file as below

import pytest from tests.fixtures.extractHiveData import extractHiveData
from tests.fixtures.loadIntoMysqlTable import loadIntoMysqlTable from
tests.fixtures.readSavedData import readSavedData from
tests.fixtures.readSourceData import readSourceData from
tests.fixtures.transformData import transformData from
tests.fixtures.saveData import saveData from tests.fixtures.readSavedData
import readSavedData

Then you have your test Python file say *test_oracle.py* under package
tests and then put assertions there

import pytest from src.config import ctest
@pytest.mark.usefixtures("extractHiveData") def
test_extract(extractHiveData): assert extractHiveData.count() > 0
@pytest.mark.usefixtures("loadIntoMysqlTable") def
test_loadIntoMysqlTable(loadIntoMysqlTable): assert loadIntoMysqlTable
@pytest.mark.usefixtures("readSavedData") def
test_readSourceData(readSourceData): assert readSourceData.count() ==
ctest['statics']['read_df_rows'] @pytest.mark.usefixtures("transformData")
def test_transformData(transformData): assert transformData.count() ==
ctest['statics']['transformation_df_rows']
@pytest.mark.usefixtures("saveData") def test_saveData(saveData): assert
saveData
@pytest.mark.usefixtures("readSavedData")
def test_readSavedData(transformData, readSavedData): assert
readSavedData.subtract(transformData).count() == 0

This is an illustration from PyCharm about directory structure unders tests


[image: image.png]


Let me know your thoughts.


Cheers,


Mich


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.