Hi all,

I managed to sort this one out in a trench way as the Pyspark available
materials are not as comprehensive as Scala one. Frankly to sort this out
was a bit of a struggle for me. However, I managed to make it work.

What the script does in a nutshell is to generate rows in Spark, and save
them to a Hive Parquet table through Spark, You can run the script anytime
to append more rows. I had the script in Scala so I wrote it in PySpark

There are a lot of handy stuff in Scala like case class for defining column
headers etc that don't seem to be available in Python (possibly my lack of
in-depth Python knowledge). However, Spark documents frequently state
availability of features to Scala and Java not Python.

However, I managed to call lambda functions in Pyspark to create an RDD
based on random columns generated. This is what I did

start = 0
if (rows == 0):
  start = 1
else:
  maxID = sqlContext.sql("SELECT MAX(id) FROM
test.randomDataPy").collect()[0][0]
  start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each
time, and that x value is used in the code to generate random data through
Python functions in a class
rdd = sc.parallelize(Range). \
         map(lambda x: (x, usedFunctions.clustered(x,numRows), \
                           usedFunctions.scattered(x,numRows), \
                           usedFunctions.randomised(x,numRows), \
                           usedFunctions.randomString(50), \
                           usedFunctions.padString(x," ",50), \
                           usedFunctions.padSingleChar("x",4000)))
df = rdd.toDF(). \
     withColumnRenamed("_1","ID"). \
     withColumnRenamed("_2", "CLUSTERED"). \
     withColumnRenamed("_3", "SCATTERED"). \
     withColumnRenamed("_4", "RANDOMISED"). \
     withColumnRenamed("_5", "RANDOM_STRING"). \
     withColumnRenamed("_6", "SMALL_VC"). \
     withColumnRenamed("_7", "PADDING")
df.printSchema()

This is the output

('number of rows is ', 200000)
('starting at ID = ', 200001, ',ending on = ', 250000)
root
 |-- ID: long (nullable = true)
 |-- CLUSTERED: double (nullable = true)
 |-- SCATTERED: double (nullable = true)
 |-- RANDOMISED: double (nullable = true)
 |-- RANDOM_STRING: string (nullable = true)
 |-- SMALL_VC: string (nullable = true)
 |-- PADDING: string (nullable = true)

Even small things like show() threw me back. This is what I had to use
other with show() I was getting error message

sqlContext.sql("""SELECT MIN(id) AS minID, MAX(id) AS maxID FROM
test.randomDataPy""").*show(n=20,truncate=False,vertical=False)*

In order to make this work with Hive I had to add
*spark.sql.catalogImplementation=hive* to spark-submit, otherwise it was
throwing errors

spark-submit --master yarn --deploy-mode client *--conf
spark.sql.catalogImplementation=hive* <file.py>

Anyway, I have attached both the Scala code and Python code that do
generate random column values plus Hive parquet tables and store them in
Parquet format. If table exists, new rows are appended. Any feedback will
be much appreciated (negative or positive so to speak).

Thanks,

Mich

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Oct 2020 at 23:46, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> I generate an array of random data and create a DF in Spark scala as
> follows
>
> val end = start + numRows - 1
> println (" starting at ID = " + start + " , ending on = " +  end )
> val usedFunctions = new UsedFunctions
>
> *val text = ( start to end ).map(i =>*
> *             (*
> *                 i.toString*
> *               , usedFunctions.clustered(i,numRows).toString*
> *               , usedFunctions.scattered(i,numRows).toString*
> *               , usedFunctions.randomised(i,numRows).toString*
> *               , usedFunctions.randomString(chars.mkString(""),50)*
> *               , usedFunctions.padString(i, " ", 50)*
> *               , usedFunctions.padSingleChar("x ", 4000)*
> *             )*
> *           ).*
> *    toArray*
>
> then I create a DF
> val df = sc.parallelize(text).
>                               map(p => columns(
>                                                   p._1.toString.toInt
>                                                 , p._2.toString.toDouble
>                                                 , p._3.toString.toDouble
>                                                 , p._4.toString.toDouble
>                                                 , p._5.toString
>                                                 , p._6.toString
>                                                 , p._7.toString
>                                               )
>                                  ).
>     toDF
>
>
> What is the equivalent of this in Pyspark, especially the first part val
> text = ..
>
>
> Thanks
>
>
> Mich
>
>
>  *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf, col
import random
import string
import math

class UsedFunctions:

  def randomString(self,length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

  def clustered(self,x,numRows):
    return math.floor(x -1)/numRows

  def scattered(self,x,numRows):
    return abs((x -1 % numRows))* 1.0

  def randomised(self,seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

  def padString(self,x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

  def padSingleChar(self,chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

  def println(self,lst):
    for ll in lst:
      print(ll[0])

usedFunctions = UsedFunctions()

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
HiveContext = HiveContext(sc)

lst = (sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy 
HH:mm:ss.ss') ")).collect()
print("\nStarted at");usedFunctions.println(lst)

numRows = 50000   ## do in increment of 50K rows otherwise you blow up driver 
memory!
#
## Check if table exist otherwise create it
#
DB = "test"
tableName = "randomDataPy"
fullyQualifiedTableName =  DB + "."+ tableName
rows = 0
sqltext  = ""
if (sqlContext.sql("SHOW TABLES IN test like 'randomDataPy'").count() == 1):
  rows = sqlContext.sql("SELECT COUNT(1) FROM 
test.randomDataPy").collect()[0][0]
  print ("number of rows is ",rows)
else:
  print("\nTable test.randomDataPy does not exist, creating table ")
  sqltext = """
     CREATE TABLE test.randomDataPy(
       ID INT
     , CLUSTERED INT
     , SCATTERED INT
     , RANDOMISED INT
     , RANDOM_STRING VARCHAR(50)
     , SMALL_VC VARCHAR(50)
     , PADDING  VARCHAR(4000)
    )
    STORED AS PARQUET
    """
  sqlContext.sql(sqltext)

start = 0
if (rows == 0):
  start = 1
else:
  maxID = sqlContext.sql("SELECT MAX(id) FROM 
test.randomDataPy").collect()[0][0]
  start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, 
and that x value is used in the code to generate random data through Python 
functions in a class
rdd = sc.parallelize(Range). \
         map(lambda x: (x, usedFunctions.clustered(x,numRows), \
                           usedFunctions.scattered(x,numRows), \
                           usedFunctions.randomised(x,numRows), \
                           usedFunctions.randomString(50), \
                           usedFunctions.padString(x," ",50), \
                           usedFunctions.padSingleChar("x",4000)))
df = rdd.toDF(). \
     withColumnRenamed("_1","ID"). \
     withColumnRenamed("_2", "CLUSTERED"). \
     withColumnRenamed("_3", "SCATTERED"). \
     withColumnRenamed("_4", "RANDOMISED"). \
     withColumnRenamed("_5", "RANDOM_STRING"). \
     withColumnRenamed("_6", "SMALL_VC"). \
     withColumnRenamed("_7", "PADDING")
df.printSchema()
df.createOrReplaceTempView("tmp")
sqltext = """
  INSERT INTO TABLE test.randomDataPy
  SELECT
          ID
        , CLUSTERED
        , SCATTERED
        , RANDOMISED
        , RANDOM_STRING
        , SMALL_VC
        , PADDING
  FROM tmp
  """
sqlContext.sql(sqltext)
sqlContext.sql("""SELECT MIN(id) AS minID, MAX(id) AS maxID FROM 
test.randomDataPy""").show(n=20,truncate=False,vertical=False)
##sqlContext.sql("""SELECT * FROM test.randomDataPy ORDER BY 
id""").show(n=20,truncate=False,vertical=False)
lst = (sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy 
HH:mm:ss.ss') ")).collect()
print("\nFinished at");usedFunctions.println(lst)

Attachment: dynamic_ARRAY_generator_parquet.scala
Description: Binary data

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to