Re: pyspark DataFrameWriter ignores customized settings?

2018-03-20 Thread Ryan Blue
To clarify what's going on here: dfs.blocksize and dfs.block.size set the
HDFS block size (the spark.hadoop. prefix adds this to the Hadoop
configuration). The Parquet "block size" is more accurately called the "row
group size", but is set using the unfortunately-named property
parquet.block.size. I think the reason why this was hard for anyone to
answer was that the HDFS block size was set correctly, but Parquet's row
group size was what the value was intended for.

HDFS doesn't know anything about Parquet's row group size and still splits
its blocks on the HDFS block size. Parquet tries to fit a whole number of
row groups into the HDFS blocks and will pad if necessary to avoid reading
a row group from multiple blocks. The Parquet community's recommendation
for row group size is to use a size that is large (tens of megabytes at a
minimum) and that divides the HDFS block size (to fit a whole number of row
groups). The default, 128MB row groups, is reasonable for all of the
datasets that I've tuned, but some use cases have opted for smaller values
(16 or 32MB) to increase parallelism.

rb

On Fri, Mar 16, 2018 at 3:37 PM, chhsiao1981 
wrote:

> Hi all,
>
> Found the answer from the following link:
>
> https://forums.databricks.com/questions/918/how-to-set-size-
> of-parquet-output-files.html
>
> I can successfully setup parquet block size with
> spark.hadoop.parquet.block.size.
>
> The following is the sample code:
>
> # init
> block_size = 512 * 1024
>
> conf =
> SparkConf().setAppName("myapp").setMaster("spark://spark1:
> 7077").set('spark.cores.max',
> 20).set("spark.executor.cores", 10).set("spark.executor.memory",
> "10g").set('spark.hadoop.parquet.block.size',
> str(block_size)).set("spark.hadoop.dfs.blocksize",
> str(block_size)).set("spark.hadoop.dfs.block.size",
> str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size",
> str(131072))
>
> sc = SparkContext(conf=conf)
> spark = SparkSession(sc)
>
> # create DataFrame
> df_txt = spark.createDataFrame([{'temp': "hello"}, {'temp': "world"},
> {'temp': "!"}])
>
> # save using DataFrameWriter, resulting 512k-block-size
>
> df_txt.write.mode('overwrite').format('parquet').save('hdfs:
> //spark1/tmp/temp_with_df')
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: pyspark DataFrameWriter ignores customized settings?

2018-03-16 Thread chhsiao1981
Hi all,

Found the answer from the following link:

https://forums.databricks.com/questions/918/how-to-set-size-of-parquet-output-files.html

I can successfully setup parquet block size with
spark.hadoop.parquet.block.size.

The following is the sample code:

# init
block_size = 512 * 1024 

conf =
SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set('spark.hadoop.parquet.block.size',
str(block_size)).set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size",
str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size",
str(131072))

sc = SparkContext(conf=conf) 
spark = SparkSession(sc) 

# create DataFrame 
df_txt = spark.createDataFrame([{'temp': "hello"}, {'temp': "world"},
{'temp': "!"}]) 

# save using DataFrameWriter, resulting 512k-block-size 

df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: pyspark DataFrameWriter ignores customized settings?

2018-03-16 Thread chhsiao1981
Hi all,

Looks like it's parquet-specific issue.

I can successfully write with 512k block-size
if I use df.write.csv() or use df.write.text()
(I can successfully do csv write when I put hadoop-lzo-0.4.15-cdh5.13.0.jar
into the jars dir)

sample code:


block_size = 512 * 1024

conf =
SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size",
str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size",
str(131072))

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# create DataFrame
df_txt = spark.createDataFrame([\{'temp': "hello"}, \{'temp': "world"},
\{'temp': "!"}])

# save using DataFrameWriter, resulting 128MB-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using DataFrameWriter.csv, resulting 512k-block-size
df_txt.write.mode('overwrite').csv('hdfs://spark1/tmp/temp_with_df_csv')

# save using DataFrameWriter.text, resulting 512k-block-size
df_txt.write.mode('overwrite').text('hdfs://spark1/tmp/temp_with_df_text')

# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



pyspark DataFrameWriter ignores customized settings?

2018-03-10 Thread Chuan-Heng Hsiao
hi all,

I am using spark-2.2.1-bin-hadoop2.7 with stand-alone mode.
(python version: 3.5.2 from ubuntu 16.04)
I intended to have DataFrame write to hdfs with customized block-size but
failed.
However, the corresponding rdd can successfully write with the
customized block-size.

Could you help me figure out the issue?

Best regards,
Hsiao



The following is the test code:
(dfs.namenode.fs-limits.min-block-size has been set as 131072 in hdfs)


##
# init
##
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import hdfs
from hdfs import InsecureClient
import os

import numpy as np
import pandas as pd
import logging

os.environ['SPARK_HOME'] = '/opt/spark-2.2.1-bin-hadoop2.7'

block_size = 512 * 1024

conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077
").set('spark.cores.max',
20).set("spark.executor.cores", 10).set("spark.executor.memory",
"10g").set("spark.hadoop.dfs.blocksize",
str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size))

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.blocksize",
block_size)
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.block.size",
block_size)

##
# main
##

# create DataFrame
df_txt = spark.createDataFrame([\{'temp': "hello"}, \{'temp': "world"},
\{'temp': "!"}])

# save using DataFrameWriter, resulting 128MB-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')