Re: How can I get the same spark context in two different python processes

2022-12-12 Thread Kevin Su
Hi Jack,

My use case is a bit different, I created a subprocess instead of thread. I
can't pass the args to subprocess.

Jack Goodson  於 2022年12月12日 週一 晚上8:03寫道:

> apologies, the code should read as below
>
> from threading import Thread
>
> context = pyspark.sql.SparkSession.builder.appName("spark").getOrCreate()
>
> t1 = Thread(target=my_func, args=(context,))
> t1.start()
>
> t2 = Thread(target=my_func, args=(context,))
> t2.start()
>
> On Tue, Dec 13, 2022 at 4:10 PM Jack Goodson 
> wrote:
>
>> Hi Kevin,
>>
>> I had a similar use case (see below code) but with something that wasn’t
>> spark related. I think the below should work for you, you may need to edit
>> the context variable to suit your needs but hopefully it gives the general
>> idea of sharing a single object between multiple threads.
>>
>> Thanks
>>
>>
>> from threading import Thread
>>
>> context = pyspark.sql.SparkSession.builder.appName("spark").getOrCreate()
>>
>> t1 = Thread(target=order_creator, args=(app_id, sleep_time,))
>> t1.start(target=my_func, args=(context,))
>>
>> t2 = Thread(target=order_creator, args=(app_id, sleep_time,))
>> t2.start(target=my_func, args=(context,))
>>
>


Re: How can I get the same spark context in two different python processes

2022-12-12 Thread Kevin Su
Maciej, Thanks for the reply.
Could you share an example to achieve it?

Maciej  於 2022年12月12日 週一 下午4:41寫道:

> Technically speaking, it is possible in stock distribution (can't speak
> for Databricks) and not super hard to do (just check out how we
> initialize sessions), but definitely not something that we test or
> support, especially in a scenario you described.
>
> If you want to achieve concurrent execution, multithreading is normally
> more than sufficient and avoids problems with the context.
>
>
>
> On 12/13/22 00:40, Kevin Su wrote:
> > I ran my spark job by using databricks job with a single python script.
> > IIUC, the databricks platform will create a spark context for this
> > python script.
> > However, I create a new subprocess in this script and run some spark
> > code in this subprocess, but this subprocess can't find the
> > context created by databricks.
> > Not sure if there is any api I can use to get the default context.
> >
> > bo yang mailto:bobyan...@gmail.com>> 於 2022年12月
> > 12日 週一 下午3:27寫道:
> >
> > In theory, maybe a Jupyter notebook or something similar could
> > achieve this? e.g. running some Jypyter kernel inside Spark driver,
> > then another Python process could connect to that kernel.
> >
> > But in the end, this is like Spark Connect :)
> >
> >
> > On Mon, Dec 12, 2022 at 2:55 PM Kevin Su  >     <mailto:pings...@gmail.com>> wrote:
> >
> > Also, is there any way to workaround this issue without
> > using Spark connect?
> >
> > Kevin Su mailto:pings...@gmail.com>> 於
> > 2022年12月12日 週一 下午2:52寫道:
> >
> > nvm, I found the ticket.
> > Also, is there any way to workaround this issue without
> > using Spark connect?
> >
> > Kevin Su mailto:pings...@gmail.com>> 於
> > 2022年12月12日 週一 下午2:42寫道:
> >
> > Thanks for the quick response? Do we have any PR or Jira
> > ticket for it?
> >
> > Reynold Xin  > <mailto:r...@databricks.com>> 於 2022年12月12日 週一 下
> > 午2:39寫道:
> >
> > Spark Connect :)
> >
> > (It’s work in progress)
> >
> >
> > On Mon, Dec 12 2022 at 2:29 PM, Kevin Su
> > mailto:pings...@gmail.com>>
> wrote:
> >
> > Hey there, How can I get the same spark context
> > in two different python processes?
> > Let’s say I create a context in Process A, and
> > then I want to use python subprocess B to get
> > the spark context created by Process A. How can
> > I achieve that?
> >
> > I've
> >
>  tried pyspark.sql.SparkSession.builder.appName("spark").getOrCreate(), but
> it will create a new spark context.
> >
>
> --
> Best regards,
> Maciej Szymkiewicz
>
> Web: https://zero323.net
> PGP: A30CEF0C31A501EC
>
>


Re: How can I get the same spark context in two different python processes

2022-12-12 Thread Kevin Su
I ran my spark job by using databricks job with a single python script.
IIUC, the databricks platform will create a spark context for this python
script.
However, I create a new subprocess in this script and run some spark code
in this subprocess, but this subprocess can't find the context created by
databricks.
Not sure if there is any api I can use to get the default context.

bo yang  於 2022年12月12日 週一 下午3:27寫道:

> In theory, maybe a Jupyter notebook or something similar could achieve
> this? e.g. running some Jypyter kernel inside Spark driver, then another
> Python process could connect to that kernel.
>
> But in the end, this is like Spark Connect :)
>
>
> On Mon, Dec 12, 2022 at 2:55 PM Kevin Su  wrote:
>
>> Also, is there any way to workaround this issue without using Spark
>> connect?
>>
>> Kevin Su  於 2022年12月12日 週一 下午2:52寫道:
>>
>>> nvm, I found the ticket.
>>> Also, is there any way to workaround this issue without using Spark
>>> connect?
>>>
>>> Kevin Su  於 2022年12月12日 週一 下午2:42寫道:
>>>
>>>> Thanks for the quick response? Do we have any PR or Jira ticket for it?
>>>>
>>>> Reynold Xin  於 2022年12月12日 週一 下午2:39寫道:
>>>>
>>>>> Spark Connect :)
>>>>>
>>>>> (It’s work in progress)
>>>>>
>>>>>
>>>>> On Mon, Dec 12 2022 at 2:29 PM, Kevin Su  wrote:
>>>>>
>>>>>> Hey there, How can I get the same spark context in two different
>>>>>> python processes?
>>>>>> Let’s say I create a context in Process A, and then I want to use
>>>>>> python subprocess B to get the spark context created by Process A. How 
>>>>>> can
>>>>>> I achieve that?
>>>>>>
>>>>>> I've
>>>>>> tried pyspark.sql.SparkSession.builder.appName("spark").getOrCreate(), 
>>>>>> but
>>>>>> it will create a new spark context.
>>>>>>
>>>>>


Re: How can I get the same spark context in two different python processes

2022-12-12 Thread Kevin Su
Also, is there any way to workaround this issue without using Spark connect?

Kevin Su  於 2022年12月12日 週一 下午2:52寫道:

> nvm, I found the ticket.
> Also, is there any way to workaround this issue without using Spark
> connect?
>
> Kevin Su  於 2022年12月12日 週一 下午2:42寫道:
>
>> Thanks for the quick response? Do we have any PR or Jira ticket for it?
>>
>> Reynold Xin  於 2022年12月12日 週一 下午2:39寫道:
>>
>>> Spark Connect :)
>>>
>>> (It’s work in progress)
>>>
>>>
>>> On Mon, Dec 12 2022 at 2:29 PM, Kevin Su  wrote:
>>>
>>>> Hey there, How can I get the same spark context in two different python
>>>> processes?
>>>> Let’s say I create a context in Process A, and then I want to use
>>>> python subprocess B to get the spark context created by Process A. How can
>>>> I achieve that?
>>>>
>>>> I've
>>>> tried pyspark.sql.SparkSession.builder.appName("spark").getOrCreate(), but
>>>> it will create a new spark context.
>>>>
>>>


How can I get the same spark context in two different python processes

2022-12-12 Thread Kevin Su
Hey there, How can I get the same spark context in two different python
processes?
Let’s say I create a context in Process A, and then I want to use python
subprocess B to get the spark context created by Process A. How can I
achieve that?

I've tried pyspark.sql.SparkSession.builder.appName("spark").getOrCreate(),
but it will create a new spark context.


How to run spark benchmark on standalone cluster?

2021-07-02 Thread Kevin Su
Hi all,

I want to run spark benchmark on a standalone cluster, and I have changed
the DataSourceReadBenchmark.scala setting. (Remove "spark.master")

--- a/sql/core/src/test
/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test
/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -48,7 +48,6 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
val conf = new SparkConf()
  .setAppName("DataSourceReadBenchmark")
  // Since `spark.master` always exists, overrides this value
-  .set("spark.master", "local[1]")
  .setIfMissing("spark.driver.memory", "3g")
  .setIfMissing("spark.executor.memory", "3g")

I ran the benchmark using below command

bin/spark-submit \

--driver-memory 16g \

--master spark://kobe-pc:7077 \

--class org.apache.spark.benchmark.Benchmarks \

--jars \ "`find . -name '*-SNAPSHOT-tests.jar' -o -name
'*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ "`find . -name
'spark-core*-SNAPSHOT-tests.jar'`" \
"org.apache.spark.sql.execution.datasources.*"

I met the below error

Driver stacktrace:
21/07/02 22:35:13 INFO DAGScheduler: Job 0 failed: apply at
BenchmarkBase.scala:42, took 1.374943 s
21/07/02 22:35:13 ERROR FileFormatWriter: Aborting job
a6ceeb0c-5f9d-44ca-a896-65d4a7b8b948.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in
stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0
(TID 7) (192.168.103.14 executor 0): java.lang.NoClassDefFoundError: Could
not initialize class
org.apache.spark.sql.execution.datasources.csv.CSVBenchmark$
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)


Fail to run benchmark in Github Action

2021-06-25 Thread Kevin Su
Hi all,

I try to run a benchmark test in GitHub action in my fork, and I faced the
below error.
https://github.com/pingsutw/spark/runs/2867617238?check_suite_focus=true
java.lang.AssertionError: assertion failed: spark.test.home is not set!
23799

at scala.Predef$.assert(Predef.scala:223)
23800

at org.apache.spark.deploy.worker.Worker.(Worker.scala:148)
23801

at
org.apache.spark.deploy.worker.Worker$.startRpcEnvAndEndpoint(Worker.scala:954)

23802

at
org.apache.spark.deploy.LocalSparkCluster.$anonfun$start$2(LocalSparkCluster.scala:68)

23803

at
org.apache.spark.deploy.LocalSparkCluster.$anonfun$start$2$adapted(LocalSparkCluster.scala:65)

23804

at scala.collection.immutable.Range.foreach(Range.scala:158)

After I add the  "--driver-java-options
"-Dspark.test.home=$GITHUB_WORKSPACE" \" in benchmark.yml


I still got the below error.
https://github.com/pingsutw/spark/runs/2911027350?check_suite_focus=true
.
Do I need to set something up in my fork?
after 1900, vec on, rebase EXCEPTION 7474 7511 58 13.4 74.7 2.7X
4427
after
1900, vec on, rebase LEGACY 9228 9296 60 10.8 92.3 2.2X
4428
after
1900, vec on, rebase CORRECTED 7553 7678 128 13.2 75.5 2.7X
4429
before
1900, vec off, rebase LEGACY 23280 23362 71 4.3 232.8 0.9X
4430
before
1900, vec off, rebase CORRECTED 20548 20630 119 4.9 205.5 1.0X
4431
before
1900, vec on, rebase LEGACY 12210 12239 37 8.2 122.1 1.7X
4432
before
1900, vec on, rebase CORRECTED 7486 7489 2 13.4 74.9 2.7X
4433

4434
Running
benchmark: Save TIMESTAMP_MICROS to parquet
4435

Running case: after 1900, noop
4436

Stopped after 1 iterations, 4003 ms
4437

Running case: before 1900, noop
4438

Stopped after 1 iterations, 3965 ms
4439

Running case: after 1900, rebase EXCEPTION
4440

Stopped after 1 iterations, 18339 ms
4441

Running case: after 1900, rebase LEGACY
4442

Stopped after 1 iterations, 18375 ms
4443

Running case: after 1900, rebase CORRECTED


Stopped after 1 iterations, 18716 ms
4445

Running case: before 1900, rebase LEGACY
4446
Error:
The operation was canceled.