Hi, I'm interested in figuring out how the Python API for Spark works,
I've came to the following conclusion and want to share this with the
community; could be of use in the PySpark docs here, specifically the
"Execution and pipelining part".
Any sanity checking would be much appreciated, here's the trivial Python
example I've traced:
from pyspark import SparkContext
sc = SparkContext("local[1]", "Adam test")
sc.setCheckpointDir("foo checkpoint dir")
Added this JVM option:
export
IBM_JAVA_OPTIONS="-Xtrace:methods={org/apache/spark/*,py4j/*},print=mt"
Prints added in py4j-java/src/py4j/commands/CallCommand.java -
specifically in the execute method. Built and replaced existing class in
the py4j 0.9 jar in my Spark assembly jar. Example output is:
In execute for CallCommand, commandName: c
target object id: o0
methodName: get
I'll launch the Spark application with:
$SPARK_HOME/bin/spark-submit --master local[1] Adam.py > checkme.txt 2>&1
I've quickly put together the following WIP diagram of what I think is
happening:
http://postimg.org/image/nihylmset/
To summarise I think:
We're heavily using reflection (as evidenced by Py4j's ReflectionEngine
and MethodInvoker classes) to invoke Spark's API in a JVM from Python
There's an agreed protocol (in Py4j's Protocol.java) for handling
commands: said commands are exchanged using a local socket between Python
and our JVM (the driver based on docs, not the master)
The Spark API is accessible by means of commands exchanged using said
socket using the agreed protocol
Commands are read/written using BufferedReader/Writer
Type conversion is also performed from Python to Java (not looked at in
detail yet)
We keep track of the objects with, for example, o0 representing the first
object we know about
Does this sound correct?
I've only checked the trace output in local mode, curious as to what
happens when we're running in standalone mode (I didn't see a Python
interpreter appearing on all workers in order to process partitions of
data, I assume in standalone mode we use Python solely as an orchestrator
- the driver - and not as an executor for distributed computing?).
Happy to provide the full trace output on request (omitted timestamps,
logging info, added spacing), I expect there's a O*JDK method tracing
equivalent so the above can easily be reproduced regardless of Java
vendor.
Cheers,
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number
741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU