On 28 Sep 2017, at 15:27, Daniel Siegmann <dsiegm...@securityscorecard.io<mailto:dsiegm...@securityscorecard.io>> wrote:
Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text file? Does it use InputFormat do create multiple splits and creates 1 partition per split? Also, in case of S3 or NFS, how does the input split work? I understand for HDFS files are already pre-split so Spark can use dfs.blocksize to determine partitions. But how does it work other than HDFS? S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides to split for the local filesystem. But it does. Maybe someone else will be able to explain the details. HDFS files are split into blocks, each with a real block size and location, which is that created when the file was written/copied. If you have a 100 MB replicated on 3 machines with a block size of 64MB, you will have two blocks for the file: 64 and 36, with three replicas of each block. Blocks are placed across machines (normally, 2 hosts on one rack, 1 on on a different rack....gives you better resilience to failures of rack switches). There's no attempt to colocate blocks of the same file, *except* that HDFS will attempt to write every block onto the host where the program generating the data is running. So, space permitting, if the 100MB file is created on host 1, then host 1 will have block-1 replica-1, and block-2-replica-1, with the others scattered around the cluster. The code is actually https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L386 Because it's fixed in HDFS, you get the block size used at creation time; different formats may provide their own split information independent of that block size though. (This also means if you different block sizes for different files in the set of files you process, there may be different splits for each file, as well as different locations. With HDFS replication, you get the bandwidth of all the hard disks serving up data. With a 100 MB file split in two, if those blocks were actually saved onto different physical Hard disks (say SAS disks with 6 gb/s), then you have 3 x 2 x 6 gb/s bandwidth, for a max of 24 gb/s. (of course, there's the other work competing for disk IO); that's maximum. If spark schedules the work on those machines and you have the Hadoop native libraries installed (i.e. you don't get told off in the logs for not having them), then the HDFS client running in the spark processes can talk direct to the HDFS datanode and get give a native OS file handle to read those blocks: there isn't even a network stack to interfere. If you are working with remote data, then the network slows things down.. The S3A client just makes things up. you can configure the settings to lie about block size. If you have 100MB files and want to split the work five ways, in that job, set spark.hadoop.fs.s3a.block.size = 20971520 The other object stores have different options, but it's the same thing really. You get to choose client size what Spark is told, which is then used by the driver to make its decisions about which splits to give to which drivers for processing, the order, etc. Unlike HDFS, the bandwidth you get off S3 for a single file is fixed, irrespective of how many blocks you tell the client there are. Declaring setting a lower block size & so allowing more workers at the data isn't going to guarantee more performance, you'll just be sharing the same IO rate ...though, talking to S3, a big factor in performance working with the data is actually cost of breaking and recreating HTTP connections, which happens a lot if you have seek-heavy code reading large files. And the columnar formats, ORC and Parquet, are seek heavy, provided they aren't gzipped. Reading these files has pretty awful performance until you run Hadoop 2.8+ and tell S3A that you are doing random IO (which kills .gz reading, use wisely) spark.hadoop.fs.s3a.experimental.fadvise random All this stuff and more is all in the source files —don't be afraid to look into it to see what's going on. I always recommend starting with the stack traces you get when things aren't working right. If you are using S3, that's all in : https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a -steve