Sure Mich...uhm...let me try to run your code in my IDE. .. I m intrigued
by the error..
Will report back either if I find something or not.
Kind regards

On Sun, Dec 13, 2020, 9:46 AM Mich Talebzadeh <>

> thanks Marco.
> When I stripped down spark etc and ran your map, it came back OK (no
> errors) WITHOUT global numRows
> However, with full code, this is the unresolved reference notification I
> am getting as attached embedded your code WITHOUT global numRows
> regards,
> Mich
> On Sat, 12 Dec 2020 at 21:48, Sofia’s World <> wrote:
>> Hi Mich
>>  i dont think it's a good idea...  I believe your IDE is playing tricks
>> on you.
>> Take spark out of the equation.... this is a python issue only.
>> i am  guessing your IDE is somehow messing up your environment.
>> if you take out the whole spark code and replace it by this code
>> map(lambda x: (x, uf.clustered(x,numRows), \
>>                            uf.scattered(x,numRows), \
>>                            uf.randomised(x, numRows), \
>>                            uf.randomString(50), \
>>                            uf.padString(x," ",50), \
>>                            uf.padSingleChar("x",4000)), [1,2,3,4,5])
>> you should get exactly the same error...
>> Send me a zip with the tfconstants,py and a trimmed donw version of your
>> main,py and i'll plug it in my IDE and see if i can reproduce
>> It worked fine in  Jupyter, but then i have all functins in same notebook
>> hth
>>  marco
>> On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <
>>> wrote:
>>> I solved the issue of variable numRows within the lambda function not
>>> defined by defining it as a Global variable
>>> global numRows
>>> numRows = 10   ## do in increment of 50K rows otherwise you blow up driver 
>>> memory!
>>> #
>>> Then I could call it within the lambda function as follows
>>> rdd = sc.parallelize(Range). \
>>>          map(lambda x: (x, uf.clustered(x,numRows), \
>>>                            uf.scattered(x,numRows), \
>>>                            uf.randomised(x, numRows), \
>>>                            uf.randomString(50), \
>>>                            uf.padString(x," ",50), \
>>>                            uf.padSingleChar("x",4000)))
>>> This then worked. I am not convinced this is *the correct* solution but
>>> somehow it worked.
>>> Thanks
>>> Mich
>>> On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <>
>>> wrote:
>>>> many thanks KR.
>>>> If i call the clusterted function on its own it works
>>>> numRows = 100000
>>>> print(uf.clustered(200,numRows))
>>>> and returns
>>>> 0.00199
>>>> If I run all in one including the UsedFunctions claa in the same py
>>>> file it works. The code is attached
>>>> However, in PyCharm, I do the following
>>>> Note that this file only contains functions and no
>>>> class
>>>> import logging
>>>> import random
>>>> import string
>>>> import math
>>>> def randomString(length):
>>>>     letters = string.ascii_letters
>>>>     result_str = ''.join(random.choice(letters) for i in range(length))
>>>>     return result_str
>>>> def clustered(x,numRows):
>>>>     return math.floor(x -1)/numRows
>>>> def scattered(x,numRows):
>>>>     return abs((x -1 % numRows))* 1.0
>>>> def randomised(seed,numRows):
>>>>     random.seed(seed)
>>>>     return abs(random.randint(0, numRows) % numRows) * 1.0
>>>> def padString(x,chars,length):
>>>>     n = int(math.log10(x) + 1)
>>>>     result_str = ''.join(random.choice(chars) for i in
>>>> range(length-n)) + str(x)
>>>>     return result_str
>>>> def padSingleChar(chars,length):
>>>>     result_str = ''.join(chars for i in range(length))
>>>>     return result_str
>>>> def println(lst):
>>>>     for ll in lst:
>>>>       print(ll[0])
>>>> In the  I have this code which is failing
>>>> from pyspark import SparkContext, SparkConf
>>>> from pyspark.sql import SQLContext
>>>> from pyspark.sql import HiveContext
>>>> from pyspark.sql import SparkSession
>>>> from pyspark.sql import Row
>>>> from pyspark.sql.types import StringType, ArrayType
>>>> from pyspark.sql.functions import udf, col, max as max, to_date,
>>>> date_add, \
>>>>     add_months
>>>> from datetime import datetime, timedelta
>>>> import os
>>>> from os.path import join, abspath
>>>> from typing import Optional
>>>> import logging
>>>> import random
>>>> import string
>>>> import math
>>>> import mathOperations as mo
>>>> import UsedFunctions as uf
>>>> ##import test_oracle as to
>>>> class main:
>>>>   rec = {}
>>>>   settings = [
>>>>                 ("hive.exec.dynamic.partition", "true"),
>>>>                 ("hive.exec.dynamic.partition.mode", "nonstrict"),
>>>>                 ("spark.sql.orc.filterPushdown", "true"),
>>>>                 ("hive.msck.path.validation", "ignore"),
>>>>                 ("spark.sql.caseSensitive", "true"),
>>>>                 ("spark.speculation", "false"),
>>>>                 ("",
>>>> "false"),
>>>>                 ("hive.metastore.client.connect.retry.delay", "5s"),
>>>>                 ("hive.metastore.client.socket.timeout", "1800s"),
>>>>                 ("hive.metastore.connect.retries", "12"),
>>>>                 ("hive.metastore.execute.setugi", "false"),
>>>>                 ("hive.metastore.failure.retries", "12"),
>>>>                 ("hive.metastore.schema.verification", "false"),
>>>>                 ("hive.metastore.schema.verification.record.version",
>>>> "false"),
>>>>                 ("hive.metastore.server.max.threads", "100000"),
>>>>                 ("",
>>>> "/apps/hive/warehouse")
>>>> ]
>>>>   configs = {"DB":"pycharm",
>>>>            "tableName":"randomDataPy"}
>>>>   DB = "pycharm"
>>>>   tableName = "randomDataPy"
>>>>   fullyQualifiedTableName = DB +"."+tableName
>>>>   spark = SparkSession.builder \
>>>>           .appName("app1") \
>>>>           .enableHiveSupport() \
>>>>           .getOrCreate()
>>>>   spark.sparkContext._conf.setAll(settings)
>>>>   sc = SparkContext.getOrCreate()
>>>>   print(sc.getConf().getAll())
>>>>   sqlContext = SQLContext(sc)
>>>>   HiveContext = HiveContext(sc)
>>>>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy
>>>>') ")).collect()
>>>>   print("\nStarted at");uf.println(lst)
>>>>   numRows = 100000   ## do in increment of 50K rows otherwise you blow
>>>> up driver memory!
>>>>   #
>>>>   ## Check if table exist otherwise create it
>>>>   rows = 0
>>>>   sqltext  = ""
>>>>   if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() ==
>>>> 1):
>>>>     rows = spark.sql(f"""SELECT COUNT(1) FROM
>>>> {fullyQualifiedTableName}""").collect()[0][0]
>>>>     print ("number of rows is ",rows)
>>>>   else:
>>>>     print(f"\nTable {fullyQualifiedTableName} does not exist, creating
>>>> table ")
>>>>     sqltext = """
>>>>     CREATE TABLE {DB}.{tableName}(
>>>>     ID INT
>>>>     , CLUSTERED INT
>>>>     , SCATTERED INT
>>>>     , SMALL_VC VARCHAR(50)
>>>>     , PADDING  VARCHAR(4000)
>>>>     )
>>>>     """
>>>>     spark.sql(sqltext)
>>>>   start = 0
>>>>   if (rows == 0):
>>>>     start = 1
>>>>   else:
>>>>     maxID = spark.sql(f"SELECT MAX(id) FROM
>>>> {fullyQualifiedTableName}").collect()[0][0]
>>>>     start = maxID + 1
>>>>     end = start + numRows - 1
>>>>   print ("starting at ID = ",start, ",ending on = ",end)
>>>>   Range = range(start, end+1)
>>>>   ## This traverses through the Range and increment "x" by one unit
>>>> each time, and that x value is used in the code to generate random data
>>>> through Python functions in a class
>>>>   print(numRows)
>>>>   print(uf.clustered(200,numRows))
>>>>   rdd = sc.parallelize(Range). \
>>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>>                              uf.scattered(x,10000), \
>>>>                              uf.randomised(x,10000), \
>>>>                              uf.randomString(50), \
>>>>                              uf.padString(x," ",50), \
>>>>                              uf.padSingleChar("x",4000)))
>>>>   df = rdd.toDF(). \
>>>>        withColumnRenamed("_1","ID"). \
>>>>        withColumnRenamed("_2", "CLUSTERED"). \
>>>>        withColumnRenamed("_3", "SCATTERED"). \
>>>>        withColumnRenamed("_4", "RANDOMISED"). \
>>>>        withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>        withColumnRenamed("_6", "SMALL_VC"). \
>>>>        withColumnRenamed("_7", "PADDING")
>>>>   df.write.mode("overwrite").saveAsTable("pycharm.ABCD")
>>>>   df.printSchema()
>>>>   df.explain()
>>>>   df.createOrReplaceTempView("tmp")
>>>>   sqltext = f"""
>>>>     INSERT INTO TABLE {fullyQualifiedTableName}
>>>>     SELECT
>>>>             ID
>>>>           , CLUSTERED
>>>>           , SCATTERED
>>>>           , RANDOMISED
>>>>           , RANDOM_STRING
>>>>           , SMALL_VC
>>>>           , PADDING
>>>>     FROM tmp
>>>>     """
>>>>   spark.sql(sqltext)
>>>>   spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM
>>>> {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)
>>>>   ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY
>>>> id""").show(n=20,truncate=False,vertical=False)
>>>>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy
>>>>') ")).collect()
>>>>   print("\nFinished at");usedFunctions.println(lst)
>>>> On Fri, 11 Dec 2020 at 18:04, Sofia’s World <>
>>>> wrote:
>>>>> copying and pasting your code code in a jup notebook works fine. that
>>>>> is, using my own version of Range which is simply a list of numbers
>>>>> how bout this.. does this work fine?
>>>>> list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
>>>>> If it does, i'd look in what's inside your Range and what you get out
>>>>> of it. I suspect something wrong in there
>>>>> If there was something with the clustered function, then you should be
>>>>> able to take it out of the map() and still have the code working..
>>>>> Could you try that as well?
>>>>> kr
>>>>> On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <
>>>>>> wrote:
>>>>>> Sorry, part of the code is not that visible
>>>>>> rdd = sc.parallelize(Range). \
>>>>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>                              uf.scattered(x,10000), \
>>>>>>                              uf.randomised(x,10000), \
>>>>>>                              uf.randomString(50), \
>>>>>>                              uf.padString(x," ",50), \
>>>>>>                              uf.padSingleChar("x",4000)))
>>>>>> On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <
>>>>>>> wrote:
>>>>>>> Thanks Sean,
>>>>>>> This is the code
>>>>>>> numRows = 100000   ## do in increment of 50K rows otherwise you blow up 
>>>>>>> driver memory!
>>>>>>> #
>>>>>>> ## Check if table exist otherwise create it
>>>>>>> rows = 0
>>>>>>> sqltext  = ""
>>>>>>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>>>>>>>   rows = spark.sql(f"""SELECT COUNT(1) FROM 
>>>>>>> {fullyQualifiedTableName}""").collect()[0][0]
>>>>>>>   print ("number of rows is ",rows)
>>>>>>> else:
>>>>>>>   print(f"\nTable {fullyQualifiedTableName} does not exist, creating 
>>>>>>> table ")
>>>>>>>   sqltext = """
>>>>>>>   CREATE TABLE {DB}.{tableName}(
>>>>>>>   ID INT
>>>>>>>   , CLUSTERED INT
>>>>>>>   , SCATTERED INT
>>>>>>>   , RANDOMISED INT
>>>>>>>   , SMALL_VC VARCHAR(50)
>>>>>>>   , PADDING  VARCHAR(4000)
>>>>>>>   )
>>>>>>>   """
>>>>>>>   spark.sql(sqltext)
>>>>>>> start = 0
>>>>>>> if (rows == 0):
>>>>>>>   start = 1
>>>>>>> else:
>>>>>>>   maxID = spark.sql(f"SELECT MAX(id) FROM 
>>>>>>> {fullyQualifiedTableName}").collect()[0][0]
>>>>>>>   start = maxID + 1
>>>>>>>   end = start + numRows - 1
>>>>>>> print ("starting at ID = ",start, ",ending on = ",end)
>>>>>>> Range = range(start, end+1)
>>>>>>> ## This traverses through the Range and increment "x" by one unit each 
>>>>>>> time, and that x value is used in the code to generate random data 
>>>>>>> through Python functions in a class
>>>>>>> print(numRows)
>>>>>>> print(uf.clustered(200,numRows))
>>>>>>> rdd = sc.parallelize(Range). \
>>>>>>>          map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>>                            uf.scattered(x,10000), \
>>>>>>>                            uf.randomised(x,10000), \
>>>>>>>                            uf.randomString(50), \
>>>>>>>                            uf.padString(x," ",50), \
>>>>>>>                            uf.padSingleChar("x",4000)))
>>>>>>> df = rdd.toDF(). \
>>>>>>>      withColumnRenamed("_1","ID"). \
>>>>>>>      withColumnRenamed("_2", "CLUSTERED"). \
>>>>>>>      withColumnRenamed("_3", "SCATTERED"). \
>>>>>>>      withColumnRenamed("_4", "RANDOMISED"). \
>>>>>>>      withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>>>>      withColumnRenamed("_6", "SMALL_VC"). \
>>>>>>>      withColumnRenamed("_7", "PADDING")
>>>>>>> And this is the run with error
>>>>>>> Started at
>>>>>>> 11/12/2020 14:42:45.45
>>>>>>> number of rows is  4500000
>>>>>>> starting at ID =  4500001 ,ending on =  4600000
>>>>>>> 100000
>>>>>>> 0.00199
>>>>>>> 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0
>>>>>>> (TID 33)
>>>>>>> org.apache.spark.api.python.PythonException: Traceback (most recent
>>>>>>> call last):
>>>>>>>   File
>>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\\pyspark\",
>>>>>>> line 605, in main
>>>>>>>   File
>>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\\pyspark\",
>>>>>>> line 597, in process
>>>>>>>   File
>>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\\pyspark\",
>>>>>>> line 271, in dump_stream
>>>>>>>     vs = list(itertools.islice(iterator, batch))
>>>>>>>   File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\", line
>>>>>>> 1440, in takeUpToNumLeft
>>>>>>>     yield next(iterator)
>>>>>>>   File
>>>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\\pyspark\", 
>>>>>>> line
>>>>>>> 107, in wrapper
>>>>>>>     return f(*args, **kwargs)
>>>>>>>   File
>>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/", line
>>>>>>> 101, in <lambda>
>>>>>>>     map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>> NameError: name 'numRows' is not defined
>>>>>>> Regards,
>>>>>>> Mich
>>>>>>> On Fri, 11 Dec 2020 at 16:47, Sean Owen <> wrote:
>>>>>>>> Looks like a simple Python error - you haven't shown the code that
>>>>>>>> produces it. Indeed, I suspect you'll find there is no such symbol.
>>>>>>>> On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <
>>>>>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>> This used to work but not anymore.
>>>>>>>>> I have file that has these functions
>>>>>>>>> import random
>>>>>>>>> import string
>>>>>>>>> import math
>>>>>>>>> def randomString(length):
>>>>>>>>>     letters = string.ascii_letters
>>>>>>>>>     result_str = ''.join(random.choice(letters) for i in 
>>>>>>>>> range(length))
>>>>>>>>>     return result_str
>>>>>>>>> def clustered(x,numRows):
>>>>>>>>>     return math.floor(x -1)/numRows
>>>>>>>>> def scattered(x,numRows):
>>>>>>>>>     return abs((x -1 % numRows))* 1.0
>>>>>>>>> def randomised(seed,numRows):
>>>>>>>>>     random.seed(seed)
>>>>>>>>>     return abs(random.randint(0, numRows) % numRows) * 1.0
>>>>>>>>> def padString(x,chars,length):
>>>>>>>>>     n = int(math.log10(x) + 1)
>>>>>>>>>     result_str = ''.join(random.choice(chars) for i in 
>>>>>>>>> range(length-n)) + str(x)
>>>>>>>>>     return result_str
>>>>>>>>> def padSingleChar(chars,length):
>>>>>>>>>     result_str = ''.join(chars for i in range(length))
>>>>>>>>>     return result_str
>>>>>>>>> def println(lst):
>>>>>>>>>     for ll in lst:
>>>>>>>>>       print(ll[0])
>>>>>>>>> Now in the main().py module I import this file as follows:
>>>>>>>>> import UsedFunctions as uf
>>>>>>>>> Then I try the following
>>>>>>>>> import UsedFunctions as uf
>>>>>>>>>  numRows = 100000   ## do in increment of 100K rows
>>>>>>>>>  rdd = sc.parallelize(Range). \
>>>>>>>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>>>>                              uf.scattered(x,10000), \
>>>>>>>>>                              uf.randomised(x,10000), \
>>>>>>>>>                              uf.randomString(50), \
>>>>>>>>>                              uf.padString(x," ",50), \
>>>>>>>>>                              uf.padSingleChar("x",4000)))
>>>>>>>>> The problem is that now it throws error for numRows as below
>>>>>>>>>   File
>>>>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/", 
>>>>>>>>> line
>>>>>>>>> 101, in <lambda>
>>>>>>>>>     map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>>>> NameError: name 'numRows' is not defined
>>>>>>>>> I don't know why this error is coming!
>>>>>>>>> Appreciate any ideas
>>>>>>>>> Thanks,
>>>>>>>>> Mich
