Difference between 'cores' config params: spark submit on k8s

2019-03-07 Thread Battini Lakshman
Hello,

I understand we need to specify the 'spark.kubernetes.driver.limit.cores'
and 'spark.kubernetes.executor.limit.cores' config parameters while
submitting spark on k8s namespace with resource quota applied.

There are also other config parameters 'spark.driver.cores' and
'spark.executor.cores' mentioned in documentation. What is the difference
between 'spark.driver.cores' and 'spark.kubernetes.driver.limit.cores'
please.

Thanks!

Best Regards,
Lakshman B.


Re: spark structured streaming crash due to decompressing gzip file failure

2019-03-07 Thread Lian Jiang
Thanks, it worked.

On Thu, Mar 7, 2019 at 5:05 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi,
>
> In your spark-submit command, try using the below config property and see
> if this solves the problem.
>
> --conf spark.sql.files.ignoreCorruptFiles=true
>
> For me this worked to ignore reading empty/partially uploaded gzip files
> in s3 bucket.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I have a structured streaming job which listens to a hdfs folder
>> containing jsonl.gz files. The job crashed due to error:
>>
>> java.io.IOException: incorrect header check
>> at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
>> Method)
>> at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
>> at
>> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
>> at
>> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>> at java.io.InputStream.read(InputStream.java:101)
>> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
>> at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
>> at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
>> at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
>> at
>> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>> at
>> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>> at org.apache.spark.scheduler.Task.run(Task.scala:109)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Is there a way to skip the gz files that cannot be decompressed?
>> Exception handling seems not help. The only workaround I can think of is to
>> decompress the gz files into another folder first and make the spark
>> streaming job listen to this new folder. But this workaround may not be
>> better compared with the solution using a unstructured streaming job to
>> directly decompress the gz file, read jsonl file, validate the records and
>> write the validated records into parquet.
>>
>> Any idea is highly appreciated!
>>
>>
>>
>>
>>


Re: mapreduce.input.fileinputformat.split.maxsize not working for spark 2.4.0

2019-03-07 Thread Akshay Mendole
Hi,
 No. It's a java application that uses RDD APIs.
Thanks,
Akshay


On Mon, Feb 25, 2019 at 7:54 AM Manu Zhang  wrote:

> Is your application using Spark SQL / DataFrame API ? Is so, please try
> setting
>
> spark.sql.files.maxPartitionBytes
>
> to a larger value which is 128MB by default.
>
> Thanks,
> Manu Zhang
> On Feb 25, 2019, 2:58 AM +0800, Akshay Mendole ,
> wrote:
>
> Hi,
>We have dfs.blocksize configured to be 512MB  and we have some large
> files in hdfs that we want to process with spark application. We want to
> split the files get more splits to optimise for memory but the above
> mentioned parameters are not working
> The max and min size params as below are configured to be 50MB still a
> file which is as big as 500MB is read as one split while it is expected to
> split into at least 10 input splits
>
> SparkConf conf = new SparkConf().setAppName(jobName);
>
> SparkContext sparkContext = new SparkContext(conf);
> sparkContext.hadoopConfiguration().set("mapreduce.input.fileinputformat.split.maxsize",
>  "5000");
> sparkContext.hadoopConfiguration().set("mapreduce.input.fileinputformat.split.minsize",
>  "5000");
> JavaSparkContext sc = new JavaSparkContext(sparkContext);
> sc.hadoopConfiguration().set("io.compression.codecs", 
> "com.hadoop.compression.lzo.LzopCodec");
>
>
> Could you please suggest what could be wrong with my configuration?
>
> Thanks,
> Akshay
>
>


Re: spark structured streaming crash due to decompressing gzip file failure

2019-03-07 Thread Akshay Bhardwaj
Hi,

In your spark-submit command, try using the below config property and see
if this solves the problem.

--conf spark.sql.files.ignoreCorruptFiles=true

For me this worked to ignore reading empty/partially uploaded gzip files in
s3 bucket.

Akshay Bhardwaj
+91-97111-33849


On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang  wrote:

> Hi,
>
> I have a structured streaming job which listens to a hdfs folder
> containing jsonl.gz files. The job crashed due to error:
>
> java.io.IOException: incorrect header check
> at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
> Method)
> at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
> at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
> at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
> at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
> at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Is there a way to skip the gz files that cannot be decompressed? Exception
> handling seems not help. The only workaround I can think of is to
> decompress the gz files into another folder first and make the spark
> streaming job listen to this new folder. But this workaround may not be
> better compared with the solution using a unstructured streaming job to
> directly decompress the gz file, read jsonl file, validate the records and
> write the validated records into parquet.
>
> Any idea is highly appreciated!
>
>
>
>
>


[SparkSQL, user-defined Hadoop, K8s] Hadoop free spark on kubernetes => NoClassDefFound

2019-03-07 Thread Sommer Tobias
Hi all,

we are having problems with using a custom hadoop lib in a spark image

when running it on a kubernetes cluster while following the steps of the 
documentation.

Details in the description below.



Does anyone else had similar problems? Is there something missing in the setup 
below?

Or is this a bug?



Hadoop free spark on kubernetes





Using custom hadoop libraries in spark image

does not work with following the steps of the documentation (*)

for running spark pi on kubernetes cluster.





*Usage of hadoop free build:

https://spark.apache.org/docs/2.4.0/hadoop-provided.html





Steps:
1.   Download hadoop free spark  
spark-2.4.0-bin-without-hadoop.tgz
2.   Build spark image without hadoop from this with docker-image-tool.sh
3.   Create Dockerfile to add an image layer to the spark image without 
hadoop that adds a custom hadoop

(see: Dockerfile and conf/spark-enf.sh in gist)  ==> custom Hadoop Version 2.9.2
4.   Use custom hadoop spark image to run spark examples

(see: k8s submit below)
5.   Produces JNI Error (see message below), expected instead is 
computation of pi.





See files in gist

https://gist.github.com/HectorOvid/c0bdad1b9dc8f64540b5b34e73f2a4a1





Regards,

Tobias Sommer
M.Sc. (Uni)
Team eso-IN-Swarm
Software Engineer

[Beschreibung: Description: Description: Description: Description: Description: 
Description: e-solutions-logo-text-142]

e.solutions GmbH
Despag-Str. 4a, 85055 Ingolstadt, Germany

Phone +49-8458-3332-1219
Fax +49-8458-3332-2219
tobias.som...@esolutions.de

Registered Office:
Despag-Str. 4a, 85055 Ingolstadt, Germany

e.solutions GmbH
Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
Register Court Ingolstadt HRB 5221



Hadoop free spark on kubernetes => NoClassDefFound

2019-03-07 Thread Sommer Tobias
Hi,

we are having problems with using a custom hadoop lib in a spark image

when running it on a kubernetes cluster while following the steps of the 
documentation.

Details in the description below.



Does anyone else had similar problems? Is there something missing in the setup 
below?

Or is this a bug?







Hadoop free spark on kubernetes





Using custom hadoop libraries in spark image

does not work with following the steps of the documentation (*)

for running spark pi on kubernetes cluster.





*Usage of hadoop free build:

https://spark.apache.org/docs/2.4.0/hadoop-provided.html





Steps:
1.   Download hadoop free spark  
spark-2.4.0-bin-without-hadoop.tgz
2.   Build spark image without hadoop from this with docker-image-tool.sh
3.   Create Dockerfile to add an image layer to the spark image without 
hadoop that adds a custom hadoop

(see: Dockerfile and conf/spark-enf.sh below)
4.   Use custom hadoop spark image to run spark examples

(see: k8s submit below)
5.   Produces JNI Error (see message below), expected instead is 
computation of pi.









Dockerfile

# some spark base image built via:

#  $SPARK_HOME/bin/docker-image-tool.sh  -r  -t sometag build

#  $SPARK_HOME/bin/docker-image-tool.sh  -r  -t sometag push

#

# docker build this to: >> docker build -t reg../...:1.0.0 .

#

# use spark 2.4.0 without hadoop as base image

#

FROM registry/spark-without-hadoop:2.4.0



ENV SPARK_HOME /opt/spark



# setup custom hadoop

COPY libs/hadoop-2.9.2 /opt/hadoop

ENV HADOOP_HOME /opt/hadoop



COPY conf/spark-env.sh ${SPARK_HOME}/conf/spark-env.sh



WORKDIR /opt/spark/work-dir







conf/spark-enf.sh

#!/usr/bin/env bash



# echo commands to the terminal output

set -ex



# With explicit path to 'hadoop' binary

export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)



echo $SPARK_DIST_CLASSPATH





submit command to kubernetes

$SPARK_HOME/bin/spark-submit \

--master k8s://... \

--name sparkpiexample-custom-hadoop-original \

--deploy-mode cluster \

--class org.apache.spark.examples.SparkPi \

--conf spark.executor.instances=2 \

--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:-ResizePLAB" \

--conf spark.kubernetes.memoryOverheadFactor=0.2 \

--conf 
spark.kubernetes.container.image=registry/spark-custom-hadoop-original:1.0.0 \

--conf spark.kubernetes.container.image.pullSecrets=... \

--conf spark.kubernetes.container.image.pullPolicy=Always \

--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \

local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar





Retrieved error message:

++ id -u

+ myuid=0

++ id -g

+ mygid=0

+ set +e

++ getent passwd 0

+ uidentry=root:x:0:0:root:/root:/bin/ash

+ set -e

+ '[' -z root:x:0:0:root:/root:/bin/ash ']'

+ SPARK_K8S_CMD=driver

+ case "$SPARK_K8S_CMD" in

+ shift 1

+ SPARK_CLASSPATH=':/opt/spark/jars/*'

+ env

+ grep SPARK_JAVA_OPT_

+ sort -t_ -k4 -n

+ sed 's/[^=]*=\(.*\)/\1/g'

+ readarray -t SPARK_EXECUTOR_JAVA_OPTS

+ '[' -n '' ']'

+ '[' -n '' ']'

+ PYSPARK_ARGS=

+ '[' -n '' ']'

+ R_ARGS=

+ '[' -n '' ']'

+ '[' '' == 2 ']'

+ '[' '' == 3 ']'

+ case "$SPARK_K8S_CMD" in

+ CMD=("$SPARK_HOME/bin/spark-submit" --conf 
"spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")

+ exec /sbin/tini -s -- /opt/spark/bin/spark-submit --conf 
spark.driver.bindAddress=100.96.6.123 --deploy-mode client --properties-file 
/opt/spark/conf/spark.properties --class org.apache.spark.examples.SparkPi 
spark-internal

Error: A JNI error has occurred, please check your installation and try again

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/Logger

at java.lang.Class.getDeclaredMethods0(Native Method)

at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)

at java.lang.Class.privateGetMethodRecursive(Class.java:3048)

at java.lang.Class.getMethod0(Class.java:3018)

at java.lang.Class.getMethod(Class.java:1784)

at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)

at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)

Caused by: java.lang.ClassNotFoundException: org.slf4j.Logger

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 7 more




Regards,

Tobias Sommer
M.Sc. (Uni)
Team eso-IN-Swarm
Software Engineer

[Beschreibung: Description: Description: Description: Description: Description: 
Description: e-solutions-logo-text-142]

e.solutions GmbH
Despag-Str. 4a, 85055 Ingolstadt, Germany

Phone +49-8458-3332-1219
Fax +49-8458-3332-2219
tobias.som...@esolutions.de

Registered Office:
Despag-Str. 4a, 85055 Ingolstadt, Germany

e.solutions GmbH
Managing Directors Uwe R

Re: spark df.write.partitionBy run very slow

2019-03-07 Thread JF Chen
Yes, I agree.

>From the spark UI I can ensure data is not skewed. There is only about
100MB for each task, where most of tasks takes several seconds to write the
data to hdfs, and some tasks takes minutes of time.

Regard,
Junfeng Chen


On Wed, Mar 6, 2019 at 2:39 PM Shyam P  wrote:

> Hi JF,
> Yes first we should know actual number of partitions dataframe has and its
> counts of records. Accordingly we should try to have data evenly in all
> partitions.
> It always better to have Num of paritions = N * Num of executors.
>
>
>   "But the sequence of columns in  partitionBy  decides the
> directory  hierarchy structure. I hope the sequence of columns not change"
> , this is correct.
> Hence sometimes we should go with bigger number first then lesser  try
> this ..i.e. more parent directories and less child directories. Tweet
> around it and try.
>
> "some tasks in write hdfs stage cost much more time than others" may be
> data is skewed, need to  distrube them evenly for all partitions.
>
> ~Shyam
>
> On Wed, Mar 6, 2019 at 8:33 AM JF Chen  wrote:
>
>> Hi Shyam
>> Thanks for your reply.
>> You mean after knowing the partition number of column_a, column_b,
>> column_c, the sequence of column in partitionBy should be same to the order
>> of partitions number of column a, b and c?
>> But the sequence of columns in  partitionBy  decides the
>> directory  hierarchy structure. I hope the sequence of columns not change.
>>
>> And I found one more strange things, some tasks in write hdfs stage cost
>> much more time than others, where the amount of writing data is similar.
>> How to solve it?
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P  wrote:
>>
>>> Hi JF ,
>>>  Try to execute it before df.write
>>>
>>> //count by partition_id
>>> import org.apache.spark.sql.functions.spark_partition_id
>>> df.groupBy(spark_partition_id).count.show()
>>>
>>> You will come to know how data has been partitioned inside df.
>>>
>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>> column_c)
>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>> partitions) >  ( column_c  partitions) .
>>>
>>> Try this.
>>>
>>> Regards,
>>> Shyam
>>>
>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen  wrote:
>>>
 I am trying to write data in dataset to hdfs via 
 df.write.partitionBy(column_a,
 column_b, column_c).parquet(output_path)
 However, it costs several minutes to write only hundreds of MB data to
 hdfs.
 From this article
 ,
 adding repartition method before write should work. But if there is
 data skew, some tasks may cost much longer time than average, which still
 cost much time.
 How to solve this problem? Thanks in advance !


 Regard,
 Junfeng Chen

>>>