What does spark.python.worker.memory affect?

2017-07-20 Thread Cyanny LIANG
Hi
As the documentation said:
spark.python.worker.memory
Amount of memory to use per python worker process during aggregation, in
the same format as JVM memory strings (e.g. 512m, 2g). If the memory used
during aggregation goes above this amount, it will spill the data into
disks.

I search the config in spark source code, only rdd.py use the option.
It means that the option only work in python *rdd.groupByKey or*
*rdd.sortByKey *etc.
The python* ExternalSorter or ExternalMerger* will spill data to disk when
memory reach the  spark.python.worker.memory limit.

When PythonRunner fork a python worker subprocess, what is the memory limit
for each python worker? does spark.python.worker.memory affect the memory
of a python worker?

-- 
Best & Regards
Cyanny LIANG
email: lgrcya...@gmail.com


Will different receivers run on different worker?

2016-11-21 Thread Cyanny LIANG
Hi, I am new to Spark Streaming.
In our project we want to implement a custom receiver to subscribe our log
data.
I have two questions:

1. Do Muti DStream Receivers run in different process or different threads?
2. Union muti DStream, such as 10 DStream, we observed that spark will
create 10 jobs. how many receivers will be start on a worker? We find that,
if the application got 5 executors, 10 receivers will be started on these
executors by random. Is that right?


-- 
Cyanny LIANG


1TB shuffle failed with executor lost failure

2016-09-19 Thread Cyanny LIANG
My job is 1TB join + 10 GB table on spark1.6.1
run on yarn mode:

*1. if I open shuffle service, the error is *
Job aborted due to stage failure: ShuffleMapStage 2 (writeToDirectory at
NativeMethodAccessorImpl.java:-2) has failed the maximum allowable number
of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
Executor is not registered (appId=application_1473819702737_1239, execId=52)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:105)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)

*2. if I close shuffle service, *
*set spark.executor.instances 80*
the error is :
ExecutorLostFailure (executor 71 exited caused by one of the running tasks)
Reason: Container marked as failed:
container_1473819702737_1432_01_406847560 on host:
nmg01-spark-a0021.nmg01.baidu.com. Exit status: 52. Diagnostics: Exception
from container-launch: ExitCodeException exitCode=52:
ExitCodeException exitCode=52:

These errors are reported on shuffle stage
My data is skew, some ids have 400million rows, but some ids only have
1million rows, is anybody has some ideas to solve the problem?


*3. My config is *
Here is my config
I use tungsten-sort in off-heap mode, in on-heap mode, the oom problem will
be more serious

spark.driver.cores 4

spark.driver.memory 8g


# use on client mode


spark.yarn.am.memory 8g


spark.yarn.am.cores 4


spark.executor.memory 8g


spark.executor.cores 4

spark.yarn.executor.memoryOverhead 6144


spark.memory.offHeap.enabled true


spark.memory.offHeap.size 40

Best & Regards
Cyanny LIANG