Hi,

This is an idea that I have to turn them into a Spark package.

Over a period, I had to develop various Python functions to set the Spark
connection parameters, read from and write to various sources and sinks.
These allow us to use the existing packages for spark utilities in Python
quickly without worrying about details. The  example of contents are as
follows:


   1. Create or get Spark session local
   2. Create or replace Spark session for a distributed environment
   3. Create Spark context
   4. Create Hive context
   5. Load Spark configuration parameters for Structured Streaming
   including setting for back
   pressure, kafka.maxRatePerPartition, backpressure.pid.minRate etc
   6. Load Spark configuration parameters for Hive
   7. Load Spark configuration parameters for Google BigQuery
   8. Load Spark configuration parameters for Redis
   9. Load data from Google BigQuery into DF
   10. Write data from DF to Google BigQuery
   11. loadTableFromJDBC
   12. writeTableWithJDBC
   13. loadTableFromRedis
   14. writeTableToRedis

All parameter settings are user driven and can be read through a yaml file
read into the Python dictionary. So pretty flexible

For example to read from Jdbc the code is as below

def loadTableFromJDBC(spark, url, tableName, user, password, driver,
fetchsize):
    try:
       read_df = spark.read. \
            format("jdbc"). \
            option("url", url). \
            option("dbtable", tableName). \
            option("user", user). \
            option("password", password). \
            option("driver", driver). \
            option("fetchsize", fetchsize). \
            load()
       return read_df
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

and this is the way it is called to read from Oracle through JDBC

from sparkutils import sparkstuff as s
# read data through jdbc from Oracle
tableName = self.config['OracleVariables']['sourceTable']
fullyQualifiedTableName =
self.config['OracleVariables']['dbschema']+'.'+tableName
user = self.config['OracleVariables']['oracle_user']
password = self.config['OracleVariables']['oracle_password']
driver = self.config['OracleVariables']['oracle_driver']
fetchsize = self.config['OracleVariables']['fetchsize']
read_df =
*s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)*

Let me know if this is something worth considering and worth collaborating.

Cheers

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to