Hi all, I managed to sort this one out in a trench way as the Pyspark available materials are not as comprehensive as Scala one. Frankly to sort this out was a bit of a struggle for me. However, I managed to make it work.
What the script does in a nutshell is to generate rows in Spark, and save them to a Hive Parquet table through Spark, You can run the script anytime to append more rows. I had the script in Scala so I wrote it in PySpark There are a lot of handy stuff in Scala like case class for defining column headers etc that don't seem to be available in Python (possibly my lack of in-depth Python knowledge). However, Spark documents frequently state availability of features to Scala and Java not Python. However, I managed to call lambda functions in Pyspark to create an RDD based on random columns generated. This is what I did start = 0 if (rows == 0): start = 1 else: maxID = sqlContext.sql("SELECT MAX(id) FROM test.randomDataPy").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class rdd = sc.parallelize(Range). \ map(lambda x: (x, usedFunctions.clustered(x,numRows), \ usedFunctions.scattered(x,numRows), \ usedFunctions.randomised(x,numRows), \ usedFunctions.randomString(50), \ usedFunctions.padString(x," ",50), \ usedFunctions.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.printSchema() This is the output ('number of rows is ', 200000) ('starting at ID = ', 200001, ',ending on = ', 250000) root |-- ID: long (nullable = true) |-- CLUSTERED: double (nullable = true) |-- SCATTERED: double (nullable = true) |-- RANDOMISED: double (nullable = true) |-- RANDOM_STRING: string (nullable = true) |-- SMALL_VC: string (nullable = true) |-- PADDING: string (nullable = true) Even small things like show() threw me back. This is what I had to use other with show() I was getting error message sqlContext.sql("""SELECT MIN(id) AS minID, MAX(id) AS maxID FROM test.randomDataPy""").*show(n=20,truncate=False,vertical=False)* In order to make this work with Hive I had to add *spark.sql.catalogImplementation=hive* to spark-submit, otherwise it was throwing errors spark-submit --master yarn --deploy-mode client *--conf spark.sql.catalogImplementation=hive* <file.py> Anyway, I have attached both the Scala code and Python code that do generate random column values plus Hive parquet tables and store them in Parquet format. If table exists, new rows are appended. Any feedback will be much appreciated (negative or positive so to speak). Thanks, Mich *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 13 Oct 2020 at 23:46, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > I generate an array of random data and create a DF in Spark scala as > follows > > val end = start + numRows - 1 > println (" starting at ID = " + start + " , ending on = " + end ) > val usedFunctions = new UsedFunctions > > *val text = ( start to end ).map(i =>* > * (* > * i.toString* > * , usedFunctions.clustered(i,numRows).toString* > * , usedFunctions.scattered(i,numRows).toString* > * , usedFunctions.randomised(i,numRows).toString* > * , usedFunctions.randomString(chars.mkString(""),50)* > * , usedFunctions.padString(i, " ", 50)* > * , usedFunctions.padSingleChar("x ", 4000)* > * )* > * ).* > * toArray* > > then I create a DF > val df = sc.parallelize(text). > map(p => columns( > p._1.toString.toInt > , p._2.toString.toDouble > , p._3.toString.toDouble > , p._4.toString.toDouble > , p._5.toString > , p._6.toString > , p._7.toString > ) > ). > toDF > > > What is the equivalent of this in Pyspark, especially the first part val > text = .. > > > Thanks > > > Mich > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for > any loss, damage or destruction of data or any other property which may > arise from relying on this email's technical content is explicitly > disclaimed. The author will in no case be liable for any monetary damages > arising from such loss, damage or destruction. > > >
from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col import random import string import math class UsedFunctions: def randomString(self,length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str def clustered(self,x,numRows): return math.floor(x -1)/numRows def scattered(self,x,numRows): return abs((x -1 % numRows))* 1.0 def randomised(self,seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0 def padString(self,x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str def padSingleChar(self,chars,length): result_str = ''.join(chars for i in range(length)) return result_str def println(self,lst): for ll in lst: print(ll[0]) usedFunctions = UsedFunctions() sc = SparkContext.getOrCreate() sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");usedFunctions.println(lst) numRows = 50000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it # DB = "test" tableName = "randomDataPy" fullyQualifiedTableName = DB + "."+ tableName rows = 0 sqltext = "" if (sqlContext.sql("SHOW TABLES IN test like 'randomDataPy'").count() == 1): rows = sqlContext.sql("SELECT COUNT(1) FROM test.randomDataPy").collect()[0][0] print ("number of rows is ",rows) else: print("\nTable test.randomDataPy does not exist, creating table ") sqltext = """ CREATE TABLE test.randomDataPy( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ sqlContext.sql(sqltext) start = 0 if (rows == 0): start = 1 else: maxID = sqlContext.sql("SELECT MAX(id) FROM test.randomDataPy").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class rdd = sc.parallelize(Range). \ map(lambda x: (x, usedFunctions.clustered(x,numRows), \ usedFunctions.scattered(x,numRows), \ usedFunctions.randomised(x,numRows), \ usedFunctions.randomString(50), \ usedFunctions.padString(x," ",50), \ usedFunctions.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.printSchema() df.createOrReplaceTempView("tmp") sqltext = """ INSERT INTO TABLE test.randomDataPy SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ sqlContext.sql(sqltext) sqlContext.sql("""SELECT MIN(id) AS minID, MAX(id) AS maxID FROM test.randomDataPy""").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM test.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
dynamic_ARRAY_generator_parquet.scala
Description: Binary data
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org