I am using jupyter docker stack with spark.
So I started a new notebook and this code.
import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import time
t1 = time.time()
number_cores = int(multiprocessing.cpu_count())
memory_gb = 4
def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster("local[{}]".format(number_cores))
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.adaptive.enabled", "True"
).set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "100"
).set(
"sc.setLogLevel", "ERROR"
).set(
"spark.executor.memory", "8g")
return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("My_app", SparkConf())
Gives my this in http://HOSTIP:4040/environment/
[image: image.png]
So it works.
lør. 5. nov. 2022 kl. 19:21 skrev 张健BJ :
> ok,thank you very much :) I also have two questions:
> does the "spark. read. format (" jdbc ")" read all the data from the database
> at once, and does it require a limit. My test result is that with the
> increase of data, I observe that the local memory usage has not changed
> significantly. Why?
>
> In addition, I tried to set "spark. driver. memory" and "spark. executor.
> memory" to 4g in local mode, but I observed that the memory usage did not
> work, and it was always about 1g. The
> code is as follows:
>
> import multiprocessing
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
> import time
>
> t1 = time.time()
> number_cores = int(multiprocessing.cpu_count())
> memory_gb = 4
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster("local[{}]".format(number_cores))
> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
> "spark.sql.adaptive.enabled", "True"
> ).set(
> "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
> ).set(
> "spark.sql.repl.eagerEval.maxNumRows", "100"
> ).set(
> "sc.setLogLevel", "ERROR"
> ).set(
> "spark.executor.memory", "4g")
>
> return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My_app", SparkConf())
>
> df = spark.read.format("jdbc").options(
> url='jdbc:mysql://127.0.0.1:63306/recommend?useSSL=false',
> driver='com.mysql.jdbc.Driver',
> dbtable="item_info",
> user="root",
> password="root"
> ).load()
> my_url = 'jdbc:mysql://127.0.0.1:63306/etl?useSSL=false'
> auth_mysql = {'user': 'root', 'password': 'root'}
> df = df.withColumnRenamed("id", "itemid").withColumnRenamed("category",
> "cateid") \
> .withColumnRenamed('weight', 'score').withColumnRenamed('tag',
> 'item_tags') \
> .withColumnRenamed('modify_time',
> 'item_modify_time').withColumnRenamed('start_time', 'dg_start_time') \
> .withColumnRenamed('end_time', 'dg_end_time')
> df = df.select(
> ['itemid', 'cateid', 'title', 'score', 'item_tags', 'item_modify_time',
> 'dg_start_time', 'dg_end_time']).limit(20)
> df.write.jdbc(my_url, 'item_info', mode='append', properties=auth_mysql)
> print(time.time() - t1)
>
> --
> 发件人:Bjørn Jørgensen
> 发送时间:2022年11月5日(星期六) 04:51
> 收件人:Sean Owen
> 抄 送:张健BJ ; user
> 主 题:Re: spark - local question
>
> Yes, Spark in local mode works :)
> One tip
> If you just start it, then the default settings is one core and 1 GB.
>
> I'm using this func to start spark in local mode to get all cors and max
> RAM
>
> import multiprocessing
> import os
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") #
> e.g. 4015976448
> memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster("local[{}]".format(number_cores))
> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
> "spark.sql.adaptive.enabled", &quo