Re: Online classes for spark topics

2023-03-08 Thread Sofia’s World
+1

On Wed, Mar 8, 2023 at 10:40 PM Winston Lai  wrote:

> +1, any webinar on Spark related topic is appreciated 
>
> Thank You & Best Regards
> Winston Lai
> --
> *From:* asma zgolli 
> *Sent:* Thursday, March 9, 2023 5:43:06 AM
> *To:* karan alang 
> *Cc:* Mich Talebzadeh ; ashok34...@yahoo.com <
> ashok34...@yahoo.com>; User 
> *Subject:* Re: Online classes for spark topics
>
> +1
>
> Le mer. 8 mars 2023 à 21:32, karan alang  a écrit :
>
> +1 .. I'm happy to be part of these discussions as well !
>
>
>
>
> On Wed, Mar 8, 2023 at 12:27 PM Mich Talebzadeh 
> wrote:
>
> Hi,
>
> I guess I can schedule this work over a course of time. I for myself can
> contribute plus learn from others.
>
> So +1 for me.
>
> Let us see if anyone else is interested.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 8 Mar 2023 at 17:48, ashok34...@yahoo.com 
> wrote:
>
>
> Hello Mich.
>
> Greetings. Would you be able to arrange for Spark Structured Streaming
> learning webinar.?
>
> This is something I haven been struggling with recently. it will be very
> helpful.
>
> Thanks and Regard
>
> AK
> On Tuesday, 7 March 2023 at 20:24:36 GMT, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi,
>
> This might  be a worthwhile exercise on the assumption that the
> contributors will find the time and bandwidth to chip in so to speak.
>
> I am sure there are many but on top of my head I can think of Holden Karau
> for k8s, and Sean Owen for data science stuff. They are both very
> experienced.
>
> Anyone else 樂
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 7 Mar 2023 at 19:17, ashok34...@yahoo.com.INVALID
>  wrote:
>
> Hello gurus,
>
> Does Spark arranges online webinars for special topics like Spark on K8s,
> data science and Spark Structured Streaming?
>
> I would be most grateful if experts can share their experience with
> learners with intermediate knowledge like myself. Hopefully we will find
> the practical experiences told valuable.
>
> Respectively,
>
> AK
>
>
>
>


Re: Testing ETL with Spark using Pytest

2021-02-09 Thread Sofia’s World
Hey Mich
 my 2 cents on top of Jerry's.
for reusable @fixtures across your tests, i'd leverage conftest.py and put
all of them there  -if number is not too big. OW. as you say, you can
create  tests\fixtures where you place all of them there

in term of extractHiveDAta for a @fixture it is doing too much
A fixture in pytest - anyone correct if wrong - its just an object you can
reuse across tests, something like this below. it should contain very
minimal code.. I'd say not more than 3 lines..

@fixture
def spark():
 return SparkSession()

def test_mydataframe(spark):
   mydf = spark.table("mypreferredtable")

It seems to me your extractHiveDAta is doing too much.
IMHO it should be:

@pytest.fixture
def hive_extractor():
 return 

@pytext.fixture
def default_config():
 return 

def test_extraction_from_hive(spark, hive_extractor, default_config):
  tableName = config['GCPVariables']['sourceTable']
   fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
tableName
   house_df = s.loadTableFromHiveJDBC(spark_session,
fullyQualifiedTableName)
   # To test your dataframe, do something like this
   test_df_pandas  =  .from_csv("""regionName,col2,col3
Kensington and chelsea,Value2,Value3""")
   test_df = spark.createDataFrame(test_df_pandas)
   result_df = house_df.subtract(test_df)
   self.assertEquals(0, result_df.count())

as always, pls feel free to disagree havent done much on pytest/
fixtures but this is how i'd restructure..

hth
 Marco



On Tue, Feb 9, 2021 at 5:37 PM Mich Talebzadeh 
wrote:

> Interesting points Jerry. I do not know how much atomising the unit test
> brings benefit.
>
> For example we have
>
> @pytest.fixture(scope = "session")
> def extractHiveData():
> # read data through jdbc from Hive
> spark_session = s.spark_session(ctest['common']['appName'])
> tableName = config['GCPVariables']['sourceTable']
> fullyQualifiedTableName = config['hiveVariables']['DSDB'] + '.' +
> tableName
>house_df = s.loadTableFromHiveJDBC(spark_session,
> fullyQualifiedTableName)
> # sample data selected equally n rows from Kensington and Chelsea and
> n rows from City of Westminster
> num_rows = int(ctest['statics']['read_df_rows']/2)
> house_df = house_df.filter(col("regionname") == "Kensington and
> Chelsea").limit(num_rows).unionAll(house_df.filter(col("regionname") ==
> "City of Westminster").limit(num_rows))
> return house_df
>
> Notes:
>
> That spark_session is imported from a packaged and has been tested many
> times
>
> The config static values are read through a python file config.py in turn
> reading a yml file config.yml
>
> The important ones to test is house_df, the data frame to read from the
> Hive table. That can fail for a variety of reasons.
>
>
>1. The Hive driver used is old or out of date
>2. The Hive driver does not support kerberized access that may be the
>case in production
>
> So any unit testing is going to be limited by scope. Also another point
> being is that if the extract data module fails then you are going to know
> that by calling it and probably can be rectified pretty quick. It is always
> the issue of coverage. How much testing needs to be covered.
>
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 9 Feb 2021 at 16:34, Jerry Vinokurov 
> wrote:
>
>> Sure, I think it makes sense in many cases to break things up like this.
>> Looking at your other example I'd say that you might want to break up
>> extractHiveData into several fixtures (one for session, one for config, one
>> for the df) because in my experience fixtures like those are reused
>> constantly across a test suite. In general I try to keep my fixtures to one
>> concrete task only, so that if I find myself repeating a pattern I just
>> factor it out into another fixture.
>>
>> On Tue, Feb 9, 2021 at 11:14 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Jerry for your comments.
>>>
>>> The easiest option and I concur is to have all these fixture files
>>> currently under fixtures package lumped together in conftest.py under
>>> * tests* package.
>>>
>>> Then you can get away all together from fixtures and it works. However,
>>> I gather plug and play becomes less manageable when you have a large number
>>> of fixtures (large being relative here). My main modules (not tests) are
>>> designed to do ETL from any database that supports JDBC connections (bar
>>> 

Re: Assertion of return value of dataframe in pytest

2021-02-03 Thread Sofia’s World
Hello
 my 2cents/./
well that will be an integ test to write to a 'dev' database. (which you
might pre-populate and clean up after your runs, so you can have repeatable
data).
then either you
1 - use normal sql and assert that the values you store in your dataframe
are the same as what you get from your sql
2 - surely , as there is a dataframe.write,  there would be also a
dataframe.read that you can use?


hth
 Marco



On Wed, Feb 3, 2021 at 4:51 PM Mich Talebzadeh 
wrote:

> It appears that the following assertion works assuming that result set can
> be = 0 (no data) or > 0 there is data
>
> assert df2.count() >= 0
>
> However, if I wanted to write to a JDBC database from PySpark through a
> function (already defined in another module) as below
>
>
> def writeTableToOracle(dataFrame,mode,dataset,tableName):
>
> try:
>
> dataFrame. \
>
> write. \
>
> format("jdbc"). \
>
> option("url", oracle_url). \
>
> option("dbtable", tableName). \
>
> option("user", config['OracleVariables']['oracle_user']). \
>
> option("password",
> config['OracleVariables']['oracle_password']). \
>
> option("driver", config['OracleVariables']['oracle_driver']). \
>
> mode(mode). \
>
> save()
>
> except Exception as e:
>
> print(f"""{e}, quitting""")
>
> sys.exit(1)
>
>
> and call it in the program
>
>
> from sparkutils import sparkstuff as s
>
>
> s.writeTableToOracle(df2,"overwrite",config['OracleVariables']['dbschema'],config['OracleVariables']['yearlyAveragePricesAllTable'])
>
>
> How can one assert its validity in PyTest?
>
>
> Thanks again
>
> On Wed, 3 Feb 2021 at 15:12, Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> In Pytest you want to ensure that the composed DF has the correct return.
>>
>> Example
>>
>> df2 = house_df. \
>> select( \
>> F.date_format('datetaken', '').cast("Integer").alias('YEAR') \
>> , 'REGIONNAME' \
>> ,
>> round(F.avg('averageprice').over(wSpecY)).alias('AVGPRICEPERYEAR') \
>> ,
>> round(F.avg('flatprice').over(wSpecY)).alias('AVGFLATPRICEPERYEAR') \
>> ,
>> round(F.avg('TerracedPrice').over(wSpecY)).alias('AVGTERRACEDPRICEPERYEAR')
>> \
>> ,
>> round(F.avg('SemiDetachedPrice').over(wSpecY)).alias('AVGSDPRICEPRICEPERYEAR')
>> \
>> ,
>> round(F.avg('DetachedPrice').over(wSpecY)).alias('AVGDETACHEDPRICEPERYEAR')).
>> \
>> distinct().orderBy('datetaken', asending=True)
>>
>> Will that be enough to run just this command
>>
>>   assert not []
>>
>> I believe that may be flawed because any error will be assumed to be NOT
>> NULL?
>>
>> Thanks
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Using Lambda function to generate random data in PySpark throws not defined error

2020-12-13 Thread Sofia’s World
Hey Mich
 glad to know u got to the bottom
In python, if you want to run a module - same as if you would use
Java/Scala  -you will have to define a def main()  method
You'll notice that the snippet i sent you had this syntax -
if __name__ == "main":
   main()

I am guessing you just choose an  unfortunate name for your class. Had you
called it

class pincopallino:
   .

Your IDE would not have called it because it could not find  a main method,
and then you would have been on the right track
I am guessing your main() class somehow confused your IDE.

The best way to run your spark code would be via a unit test though the
code below might give you a head start - (you'll need to configure your IDE
for this though..)

have fun
kr
 marco

import logging
from pyspark.sql import SparkSession
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
import pyspark
from pyspark.sql import SparkSession
import pytest
import shutil

def spark_session():
return SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()


def test_create_table(spark_session):
df = spark_session.createDataFrame([['one',
'two']]).toDF(*['first', 'second'])
print(df.show())

df2 = spark_session.createDataFrame([['one',
'two']]).toDF(*['first', 'second'])

df.createOrReplaceTempView('sample')

assert df.subtract(df2).count() == 0











On Sun, Dec 13, 2020 at 8:43 PM Mich Talebzadeh 
wrote:

>
> Thanks all.
>
> Found out the problem :(
>
> I defined the runner.py as
>
> class main()
>
> I replaced it with
>
> def main():
>
> and it worked without declaring numRows as global.
>
> I am still wondering the reason for it working with def main()?
>
>
> regards,
>
> Mich
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 13 Dec 2020 at 15:10, Sean Owen  wrote:
>
>> I don't believe you'll be able to use globals in a Spark task, as they
>> won't exist on the remote executor machines.
>>
>> On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> thanks Marco.
>>>
>>> When I stripped down spark etc and ran your map, it came back OK (no
>>> errors) WITHOUT global numRows
>>>
>>> However, with full code, this is the unresolved reference notification I
>>> am getting as attached embedded your code WITHOUT global numRows
>>>
>>> regards,
>>>
>>>
>>> Mich
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Dec 2020 at 21:48, Sofia’s World  wrote:
>>>
>>>> Hi Mich
>>>>  i dont think it's a good idea...  I believe your IDE is playing tricks
>>>> on you.
>>>> Take spark out of the equation this is a python issue only.
>>>> i am  guessing your IDE is somehow messing up your environment.
>>>>
>>>> if you take out the whole spark code and replace it by this code
>>>>
>>>> map(lambda x: (x, uf.clustered(x,numRows), \
>>>>uf.scattered(x,numRows), \
>>>>uf.randomised(x, numRows), \
>>>>uf.randomString(50), \
>>>>uf.padString(x," ",50), \
>>>>uf.padSingleChar("x",4000)), [1,2,3,4,5])
>>>>
>>>> you should get exactly the same error...
>>>>
>>>> Send me a zip with the tfconstants,py and a trimmed donw version of
>>>> your main,py and i'll plug it in my IDE and see if i can reproduce
>>>> It worked fine in  Jupyter, but then i have all functins in same
>>>> notebook
>>>> hth
>>>>  marco
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>

Re: Using Lambda function to generate random data in PySpark throws not defined error

2020-12-13 Thread Sofia’s World
Sure Mich...uhm...let me try to run your code in my IDE. .. I m intrigued
by the error..
Will report back either if I find something or not.
Kind regards

On Sun, Dec 13, 2020, 9:46 AM Mich Talebzadeh 
wrote:

> thanks Marco.
>
> When I stripped down spark etc and ran your map, it came back OK (no
> errors) WITHOUT global numRows
>
> However, with full code, this is the unresolved reference notification I
> am getting as attached embedded your code WITHOUT global numRows
>
> regards,
>
>
> Mich
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Dec 2020 at 21:48, Sofia’s World  wrote:
>
>> Hi Mich
>>  i dont think it's a good idea...  I believe your IDE is playing tricks
>> on you.
>> Take spark out of the equation this is a python issue only.
>> i am  guessing your IDE is somehow messing up your environment.
>>
>> if you take out the whole spark code and replace it by this code
>>
>> map(lambda x: (x, uf.clustered(x,numRows), \
>>uf.scattered(x,numRows), \
>>uf.randomised(x, numRows), \
>>uf.randomString(50), \
>>uf.padString(x," ",50), \
>>uf.padSingleChar("x",4000)), [1,2,3,4,5])
>>
>> you should get exactly the same error...
>>
>> Send me a zip with the tfconstants,py and a trimmed donw version of your
>> main,py and i'll plug it in my IDE and see if i can reproduce
>> It worked fine in  Jupyter, but then i have all functins in same notebook
>> hth
>>  marco
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I solved the issue of variable numRows within the lambda function not
>>> defined by defining it as a Global variable
>>>
>>> global numRows
>>> numRows = 10   ## do in increment of 50K rows otherwise you blow up driver 
>>> memory!
>>> #
>>>
>>> Then I could call it within the lambda function as follows
>>>
>>>
>>> rdd = sc.parallelize(Range). \
>>>  map(lambda x: (x, uf.clustered(x,numRows), \
>>>uf.scattered(x,numRows), \
>>>uf.randomised(x, numRows), \
>>>uf.randomString(50), \
>>>uf.padString(x," ",50), \
>>>uf.padSingleChar("x",4000)))
>>>
>>> This then worked. I am not convinced this is *the correct* solution but
>>> somehow it worked.
>>>
>>>
>>> Thanks
>>>
>>>
>>> Mich
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh 
>>> wrote:
>>>
>>>> many thanks KR.
>>>>
>>>> If i call the clusterted function on its own it works
>>>>
>>>> numRows = 10
>>>>
>>>> print(uf.clustered(200,numRows))
>>>>
>>>> and returns
>>>>
>>>> 0.00199
>>>> If I run all in one including the UsedFunctions claa in the same py
>>>> file it works. The code is attached
>>>>
>>>> However, in PyCharm, I do the following
>>>>
>>>> UsedFunctions.py. Note that this file only contains functions and no
>>>> class
>>>>
>>>> import logging
>>>> import random
>>>> import string
>>>> import math
>>>>
>>>> def randomString(length):
>>>> letters = string.ascii_letters
>>>> result_str = ''.join(random.choice(letters) for i in range(lengt

Re: Using Lambda function to generate random data in PySpark throws not defined error

2020-12-12 Thread Sofia’s World
t;
>>   settings = [
>>
>> ("hive.exec.dynamic.partition", "true"),
>>
>> ("hive.exec.dynamic.partition.mode", "nonstrict"),
>>
>> ("spark.sql.orc.filterPushdown", "true"),
>>
>> ("hive.msck.path.validation", "ignore"),
>>
>> ("spark.sql.caseSensitive", "true"),
>>
>> ("spark.speculation", "false"),
>>
>> ("hive.metastore.authorization.storage.checks", "false"),
>>
>> ("hive.metastore.client.connect.retry.delay", "5s"),
>>
>> ("hive.metastore.client.socket.timeout", "1800s"),
>>
>> ("hive.metastore.connect.retries", "12"),
>>
>> ("hive.metastore.execute.setugi", "false"),
>>
>> ("hive.metastore.failure.retries", "12"),
>>
>> ("hive.metastore.schema.verification", "false"),
>>
>> ("hive.metastore.schema.verification.record.version",
>> "false"),
>>
>> ("hive.metastore.server.max.threads", "10"),
>>
>> ("hive.metastore.authorization.storage.checks",
>> "/apps/hive/warehouse")
>>
>> ]
>>
>>   configs = {"DB":"pycharm",
>>
>>"tableName":"randomDataPy"}
>>
>>   DB = "pycharm"
>>
>>   tableName = "randomDataPy"
>>
>>   fullyQualifiedTableName = DB +"."+tableName
>>
>>   spark = SparkSession.builder \
>>
>>   .appName("app1") \
>>
>>   .enableHiveSupport() \
>>
>>   .getOrCreate()
>>
>>
>>   spark.sparkContext._conf.setAll(settings)
>>
>>
>>   sc = SparkContext.getOrCreate()
>>
>>   print(sc.getConf().getAll())
>>
>>   sqlContext = SQLContext(sc)
>>
>>   HiveContext = HiveContext(sc)
>>
>>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
>> HH:mm:ss.ss') ")).collect()
>>
>>   print("\nStarted at");uf.println(lst)
>>
>>
>>   numRows = 10   ## do in increment of 50K rows otherwise you blow up
>> driver memory!
>>
>>   #
>>
>>   ## Check if table exist otherwise create it
>>
>>
>>   rows = 0
>>
>>   sqltext  = ""
>>
>>   if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>>
>> rows = spark.sql(f"""SELECT COUNT(1) FROM
>> {fullyQualifiedTableName}""").collect()[0][0]
>>
>> print ("number of rows is ",rows)
>>
>>   else:
>>
>> print(f"\nTable {fullyQualifiedTableName} does not exist, creating
>> table ")
>>
>> sqltext = """
>>
>> CREATE TABLE {DB}.{tableName}(
>>
>> ID INT
>>
>> , CLUSTERED INT
>>
>> , SCATTERED INT
>>
>> , RANDOMISED INT
>>
>> , RANDOM_STRING VARCHAR(50)
>>
>> , SMALL_VC VARCHAR(50)
>>
>> , PADDING  VARCHAR(4000)
>>
>> )
>>
>> STORED AS PARQUET
>>
>> """
>>
>> spark.sql(sqltext)
>>
>>
>>   start = 0
>>
>>   if (rows == 0):
>>
>> start = 1
>>
>>   else:
>>
>> maxID = spark.sql(f"SELECT MAX(id) FROM
>> {fullyQualifiedTableName}").collect()[0][0]
>>
>> start = maxID + 1
>>
>> end = start + numRows - 1
>>
>>   print ("starting at ID = ",start, ",ending on = ",end)
>>
>>   Range = range(start, end+1)
>>
>>   ## This traverses through the Range and increment "x" by one unit each
>> time, and that x value is used in the code to generate random data through
>> Python functions in a class
>>
>>   print(numRows)
>>
>>   print(uf.clustered(200,numRows))
>>
>>   rdd = sc.parallelize(Range). \
>>
>>map(lambda x: (x, uf.clustered(x, numRows), \
>>
>>  uf.scattered(x,1), \
>>
>>

Re: Using Lambda function to generate random data in PySpark throws not defined error

2020-12-11 Thread Sofia’s World
copying and pasting your code code in a jup notebook works fine. that is,
using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of
it. I suspect something wrong in there

If there was something with the clustered function, then you should be able
to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh 
wrote:

> Sorry, part of the code is not that visible
>
> rdd = sc.parallelize(Range). \
>map(lambda x: (x, uf.clustered(x, numRows), \
>  uf.scattered(x,1), \
>  uf.randomised(x,1), \
>  uf.randomString(50), \
>  uf.padString(x," ",50), \
>  uf.padSingleChar("x",4000)))
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh 
> wrote:
>
>> Thanks Sean,
>>
>> This is the code
>>
>> numRows = 10   ## do in increment of 50K rows otherwise you blow up 
>> driver memory!
>> #
>> ## Check if table exist otherwise create it
>>
>>
>> rows = 0
>> sqltext  = ""
>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>>   rows = spark.sql(f"""SELECT COUNT(1) FROM 
>> {fullyQualifiedTableName}""").collect()[0][0]
>>   print ("number of rows is ",rows)
>> else:
>>   print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
>>   sqltext = """
>>   CREATE TABLE {DB}.{tableName}(
>>   ID INT
>>   , CLUSTERED INT
>>   , SCATTERED INT
>>   , RANDOMISED INT
>>   , RANDOM_STRING VARCHAR(50)
>>   , SMALL_VC VARCHAR(50)
>>   , PADDING  VARCHAR(4000)
>>   )
>>   STORED AS PARQUET
>>   """
>>   spark.sql(sqltext)
>>
>> start = 0
>> if (rows == 0):
>>   start = 1
>> else:
>>   maxID = spark.sql(f"SELECT MAX(id) FROM 
>> {fullyQualifiedTableName}").collect()[0][0]
>>   start = maxID + 1
>>   end = start + numRows - 1
>> print ("starting at ID = ",start, ",ending on = ",end)
>> Range = range(start, end+1)
>> ## This traverses through the Range and increment "x" by one unit each time, 
>> and that x value is used in the code to generate random data through Python 
>> functions in a class
>> print(numRows)
>> print(uf.clustered(200,numRows))
>> rdd = sc.parallelize(Range). \
>>  map(lambda x: (x, uf.clustered(x, numRows), \
>>uf.scattered(x,1), \
>>uf.randomised(x,1), \
>>uf.randomString(50), \
>>uf.padString(x," ",50), \
>>uf.padSingleChar("x",4000)))
>> df = rdd.toDF(). \
>>  withColumnRenamed("_1","ID"). \
>>  withColumnRenamed("_2", "CLUSTERED"). \
>>  withColumnRenamed("_3", "SCATTERED"). \
>>  withColumnRenamed("_4", "RANDOMISED"). \
>>  withColumnRenamed("_5", "RANDOM_STRING"). \
>>  withColumnRenamed("_6", "SMALL_VC"). \
>>  withColumnRenamed("_7", "PADDING")
>>
>>
>> And this is the run with error
>>
>>
>> Started at
>>
>> 11/12/2020 14:42:45.45
>>
>> number of rows is  450
>>
>> starting at ID =  451 ,ending on =  460
>>
>> 10
>>
>> 0.00199
>>
>> 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID
>> 33)
>>
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>
>>   File
>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>> line 605, in main
>>
>>   File
>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>> line 597, in process
>>
>>   File
>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
>> line 271, in dump_stream
>>
>> vs = list(itertools.islice(iterator, batch))
>>
>>   File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440,
>> in takeUpToNumLeft
>>
>> yield next(iterator)
>>
>>   File
>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line
>> 107, in wrapper
>>
>> return f(*args, **kwargs)
>>
>>   File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py",
>> line 101, in 
>>
>> map(lambda x: (x, uf.clustered(x, numRows), \
>>
>> NameError: name 'numRows' is not defined
>>
>> Regards,
>>
>> Mich
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. 

Re: Need Unit test complete reference for Pyspark

2020-11-19 Thread Sofia’s World
Hey
 they are good libraries..to get you started. Have used both of them..
unfortunately -as far as i saw when i started to use them  - only few
people maintains them.
But you can get pointers out of them for writing tests. the code below can
get you started
What you'll need is

- a method to create dataframe on the fly, perhaps from  a string.  you can
have a look at pandas, it will have methods for it
- a method to test dataframe equality. you can use  df1.subtract(df2)

I am assuming you are into dataframes - rather than RDDs, for which the two
packages you mention  should have everything you need

hht
 marco


import logging
from pyspark.sql import SparkSession
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
import pyspark
from pyspark.sql import SparkSession
import pytest
import shutil

@pytest.fixture
def spark_session():
return SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()


def test_create_table(spark_session):
df = spark_session.createDataFrame([['one',
'two']]).toDF(*['first', 'second'])
print(df.show())

df2 = spark_session.createDataFrame([['one',
'two']]).toDF(*['first', 'second'])

assert df.subtract(df2).count() == 0




On Thu, Nov 19, 2020 at 6:38 AM Sachit Murarka 
wrote:

> Hi Users,
>
> I have to write Unit Test cases for PySpark.
> I think pytest-spark and "spark testing base" are good test libraries.
>
> Can anyone please provide full reference for writing the test cases in
> Python using these?
>
> Kind Regards,
> Sachit Murarka
>


Re: Scala vs Python for ETL with Spark

2020-10-23 Thread Sofia’s World
Hey
 My 2 cents on CI/Cd for pyspark. You can leverage pytests + holden karau's
spark testing libs for CI  thus giving you `almost` same functionality as
Scala - I say almost as in Scala you have nice and descriptive funcspecs -

For me choice is based on expertise.having worked with teams which are 99%
python..the cost of retraining -or even hiring - is too big especially if
you have an existing project and aggressive deadlines
Plz feel free to object
Kind Regards

On Fri, Oct 23, 2020, 1:01 PM William R  wrote:

> It's really a very big discussion around Pyspark Vs Scala. I have little
> bit experience about how we can automate the CI/CD when it's a JVM based
> language.
> I would like to take this as an opportunity to understand the end-to-end
> CI/CD flow for Pyspark based ETL pipelines.
>
> Could someone please list down the steps how the pipeline automation works
> when it comes to Pyspark based pipelines in Production ?
>
> //William
>
> On Fri, Oct 23, 2020 at 11:24 AM Wim Van Leuven <
> wim.vanleu...@highestpoint.biz> wrote:
>
>> I think Sean is right, but in your argumentation you mention that 
>> 'functionality
>> is sacrificed in favour of the availability of resources'. That's where I
>> disagree with you but agree with Sean. That is mostly not true.
>>
>> In your previous posts you also mentioned this . The only reason we
>> sometimes have to bail out to Scala is for performance with certain udfs
>>
>> On Thu, 22 Oct 2020 at 23:11, Mich Talebzadeh 
>> wrote:
>>
>>> Thanks for the feedback Sean.
>>>
>>> Kind regards,
>>>
>>> Mich
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 22 Oct 2020 at 20:34, Sean Owen  wrote:
>>>
 I don't find this trolling; I agree with the observation that 'the
 skills you have' are a valid and important determiner of what tools you
 pick.
 I disagree that you just have to pick the optimal tool for everything.
 Sounds good until that comes in contact with the real world.
 For Spark, Python vs Scala just doesn't matter a lot, especially if
 you're doing DataFrame operations. By design. So I can't see there being
 one answer to this.

 On Thu, Oct 22, 2020 at 2:23 PM Gourav Sengupta <
 gourav.sengu...@gmail.com> wrote:

> Hi Mich,
>
> this is turning into a troll now, can you please stop this?
>
> No one uses Scala where Python should be used, and no one uses Python
> where Scala should be used - it all depends on requirements. Everyone
> understands polyglot programming and how to use relevant technologies best
> to their advantage.
>
>
> Regards,
> Gourav Sengupta
>
>
>>>
>
> --
> Regards,
> William R
> +919037075164
>
>
>