srowen closed pull request #22683: [SPARK-25696] The storage memory displayed on spark Application UI is… URL: https://github.com/apache/spark/pull/22683
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index e99136723f65b..0207f249f9aa0 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -87,7 +87,7 @@ objectFile <- function(sc, path, minPartitions = NULL) { #' in the list are split into \code{numSlices} slices and distributed to nodes #' in the cluster. #' -#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function +#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MiB), the function #' will write it to disk and send the file name to JVM. Also to make sure each slice is not #' larger than that limit, number of slices may be increased. #' diff --git a/R/pkg/R/mllib_tree.R b/R/pkg/R/mllib_tree.R index 0e60842dd44c8..9844061cfd074 100644 --- a/R/pkg/R/mllib_tree.R +++ b/R/pkg/R/mllib_tree.R @@ -157,7 +157,7 @@ print.summary.decisionTree <- function(x) { #' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1). #' Note: this setting will be ignored if the checkpoint directory is not #' set. -#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation. +#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation. #' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with #' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching #' can speed up training of deeper trees. Users can set how often should the @@ -382,7 +382,7 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara #' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1). #' Note: this setting will be ignored if the checkpoint directory is not #' set. -#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation. +#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation. #' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with #' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching #' can speed up training of deeper trees. Users can set how often should the @@ -588,7 +588,7 @@ setMethod("write.ml", signature(object = "RandomForestClassificationModel", path #' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1). #' Note: this setting will be ignored if the checkpoint directory is not #' set. -#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation. +#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation. #' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with #' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching #' can speed up training of deeper trees. Users can set how often should the diff --git a/core/src/main/resources/org/apache/spark/ui/static/utils.js b/core/src/main/resources/org/apache/spark/ui/static/utils.js index deeafad4eb5f5..22985e31a7808 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/utils.js +++ b/core/src/main/resources/org/apache/spark/ui/static/utils.js @@ -40,9 +40,9 @@ function formatDuration(milliseconds) { function formatBytes(bytes, type) { if (type !== 'display') return bytes; if (bytes == 0) return '0.0 B'; - var k = 1000; + var k = 1024; var dm = 1; - var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; + var sizes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']; var i = Math.floor(Math.log(bytes) / Math.log(k)); return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i]; } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 845a3d5f6d6f9..696dafda6d1ec 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1043,7 +1043,7 @@ class SparkContext(config: SparkConf) extends Logging { // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration) - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. + // A Hadoop configuration can be about 10 KiB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( @@ -2723,7 +2723,7 @@ object SparkContext extends Logging { val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( - "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( + "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 1e1c27c477877..72ca0fbe667e3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -62,14 +62,14 @@ class KryoSerializer(conf: SparkConf) if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + - s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.") + s"2048 MiB, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} MiB.") } private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " + - s"2048 mb, got: + $maxBufferSizeMb mb.") + s"2048 MiB, got: + $maxBufferSizeMb MiB.") } private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 227c9e734f0af..b4ea1ee950217 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1092,41 +1092,41 @@ private[spark] object Utils extends Logging { * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of mebibytes. */ def memoryStringToMb(str: String): Int = { - // Convert to bytes, rather than directly to MB, because when no units are specified the unit + // Convert to bytes, rather than directly to MiB, because when no units are specified the unit // is assumed to be bytes (JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt } /** - * Convert a quantity in bytes to a human-readable string such as "4.0 MB". + * Convert a quantity in bytes to a human-readable string such as "4.0 MiB". */ def bytesToString(size: Long): String = bytesToString(BigInt(size)) def bytesToString(size: BigInt): String = { - val EB = 1L << 60 - val PB = 1L << 50 - val TB = 1L << 40 - val GB = 1L << 30 - val MB = 1L << 20 - val KB = 1L << 10 - - if (size >= BigInt(1L << 11) * EB) { + val EiB = 1L << 60 + val PiB = 1L << 50 + val TiB = 1L << 40 + val GiB = 1L << 30 + val MiB = 1L << 20 + val KiB = 1L << 10 + + if (size >= BigInt(1L << 11) * EiB) { // The number is too large, show it in scientific notation. BigDecimal(size, new MathContext(3, RoundingMode.HALF_UP)).toString() + " B" } else { val (value, unit) = { - if (size >= 2 * EB) { - (BigDecimal(size) / EB, "EB") - } else if (size >= 2 * PB) { - (BigDecimal(size) / PB, "PB") - } else if (size >= 2 * TB) { - (BigDecimal(size) / TB, "TB") - } else if (size >= 2 * GB) { - (BigDecimal(size) / GB, "GB") - } else if (size >= 2 * MB) { - (BigDecimal(size) / MB, "MB") - } else if (size >= 2 * KB) { - (BigDecimal(size) / KB, "KB") + if (size >= 2 * EiB) { + (BigDecimal(size) / EiB, "EiB") + } else if (size >= 2 * PiB) { + (BigDecimal(size) / PiB, "PiB") + } else if (size >= 2 * TiB) { + (BigDecimal(size) / TiB, "TiB") + } else if (size >= 2 * GiB) { + (BigDecimal(size) / GiB, "GiB") + } else if (size >= 2 * MiB) { + (BigDecimal(size) / MiB, "MiB") + } else if (size >= 2 * KiB) { + (BigDecimal(size) / KiB, "KiB") } else { (BigDecimal(size), "B") } @@ -1157,7 +1157,7 @@ private[spark] object Utils extends Logging { } /** - * Convert a quantity in megabytes to a human-readable string such as "4.0 MB". + * Convert a quantity in megabytes to a human-readable string such as "4.0 MiB". */ def megabytesToString(megabytes: Long): String = { bytesToString(megabytes * 1024L * 1024L) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 21f481d477242..3e1a3d4f73069 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -244,7 +244,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val newConf = new SparkConf newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast - newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "10240") // 10 KB << 1MB framesize + newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "10240") // 10 KiB << 1MiB framesize // needs TorrentBroadcast so need a SparkContext withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 467e49026a029..8af53274d9b2f 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -75,7 +75,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val thrown3 = intercept[IllegalArgumentException](newKryoInstance(conf, "2g", "3g")) assert(thrown3.getMessage.contains(kryoBufferProperty)) assert(!thrown3.getMessage.contains(kryoBufferMaxProperty)) - // test configuration with mb is supported properly + // test configuration with MiB is supported properly newKryoInstance(conf, "8m", "9m") } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 959cf58fa0536..6f60b08088cd1 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -128,7 +128,7 @@ class DiskStoreSuite extends SparkFunSuite { assert(e.getMessage === s"requirement failed: can't create a byte buffer of size ${blockData.size}" + - " since it exceeds 10.0 KB.") + " since it exceeds 10.0 KiB.") } test("block data encryption") { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 901a724da8a1b..b2ff1cce3eb0b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -133,7 +133,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.byteStringAsBytes("1p") === ByteUnit.PiB.toBytes(1)) // Overflow handling, 1073741824p exceeds Long.MAX_VALUE if converted straight to Bytes - // This demonstrates that we can have e.g 1024^3 PB without overflowing. + // This demonstrates that we can have e.g 1024^3 PiB without overflowing. assert(Utils.byteStringAsGb("1073741824p") === ByteUnit.PiB.toGiB(1073741824)) assert(Utils.byteStringAsMb("1073741824p") === ByteUnit.PiB.toMiB(1073741824)) @@ -149,7 +149,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { // Test overflow exception intercept[IllegalArgumentException] { - // This value exceeds Long.MAX when converted to TB + // This value exceeds Long.MAX when converted to TiB ByteUnit.PiB.toTiB(9223372036854775807L) } @@ -189,13 +189,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("bytesToString") { assert(Utils.bytesToString(10) === "10.0 B") assert(Utils.bytesToString(1500) === "1500.0 B") - assert(Utils.bytesToString(2000000) === "1953.1 KB") - assert(Utils.bytesToString(2097152) === "2.0 MB") - assert(Utils.bytesToString(2306867) === "2.2 MB") - assert(Utils.bytesToString(5368709120L) === "5.0 GB") - assert(Utils.bytesToString(5L * (1L << 40)) === "5.0 TB") - assert(Utils.bytesToString(5L * (1L << 50)) === "5.0 PB") - assert(Utils.bytesToString(5L * (1L << 60)) === "5.0 EB") + assert(Utils.bytesToString(2000000) === "1953.1 KiB") + assert(Utils.bytesToString(2097152) === "2.0 MiB") + assert(Utils.bytesToString(2306867) === "2.2 MiB") + assert(Utils.bytesToString(5368709120L) === "5.0 GiB") + assert(Utils.bytesToString(5L * (1L << 40)) === "5.0 TiB") + assert(Utils.bytesToString(5L * (1L << 50)) === "5.0 PiB") + assert(Utils.bytesToString(5L * (1L << 60)) === "5.0 EiB") assert(Utils.bytesToString(BigInt(1L << 11) * (1L << 60)) === "2.36E+21 B") } diff --git a/docs/configuration.md b/docs/configuration.md index 9abbb3f634900..ff9b802617f08 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1384,14 +1384,14 @@ Apart from these, the following properties are also available, and may be useful </tr> <tr> <td><code>spark.files.maxPartitionBytes</code></td> - <td>134217728 (128 MB)</td> + <td>134217728 (128 MiB)</td> <td> The maximum number of bytes to pack into a single partition when reading files. </td> </tr> <tr> <td><code>spark.files.openCostInBytes</code></td> - <td>4194304 (4 MB)</td> + <td>4194304 (4 MiB)</td> <td> The estimated cost to open a file, measured by the number of bytes could be scanned at the same time. This is used when putting multiple files into a partition. It is better to overestimate, @@ -1445,7 +1445,7 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.rpc.message.maxSize</code></td> <td>128</td> <td> - Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map + Maximum message size (in MiB) to allow in "control plane" communication; generally only applies to map output size information sent between executors and the driver. Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size. </td> diff --git a/docs/hardware-provisioning.md b/docs/hardware-provisioning.md index 896f9302ef300..29876a51b2804 100644 --- a/docs/hardware-provisioning.md +++ b/docs/hardware-provisioning.md @@ -37,7 +37,7 @@ use the same disks as HDFS. # Memory -In general, Spark can run well with anywhere from **8 GB to hundreds of gigabytes** of memory per +In general, Spark can run well with anywhere from **8 GiB to hundreds of gigabytes** of memory per machine. In all cases, we recommend allocating only at most 75% of the memory for Spark; leave the rest for the operating system and buffer cache. @@ -47,7 +47,7 @@ Storage tab of Spark's monitoring UI (`http://<driver-node>:4040`) to see its si Note that memory usage is greatly affected by storage level and serialization format -- see the [tuning guide](tuning.html) for tips on how to reduce it. -Finally, note that the Java VM does not always behave well with more than 200 GB of RAM. If you +Finally, note that the Java VM does not always behave well with more than 200 GiB of RAM. If you purchase machines with more RAM than this, you can run _multiple worker JVMs per node_. In Spark's [standalone mode](spark-standalone.html), you can set the number of workers per node with the `SPARK_WORKER_INSTANCES` variable in `conf/spark-env.sh`, and the number of cores diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index ec13b81f85557..281755f4cea8f 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -149,7 +149,7 @@ These parameters may be tuned. Be careful to validate on held-out test data whe * Note that the `maxBins` parameter must be at least the maximum number of categories `$M$` for any categorical feature. * **`maxMemoryInMB`**: Amount of memory to be used for collecting sufficient statistics. - * The default value is conservatively chosen to be 256 MB to allow the decision algorithm to work in most scenarios. Increasing `maxMemoryInMB` can lead to faster training (if the memory is available) by allowing fewer passes over the data. However, there may be decreasing returns as `maxMemoryInMB` grows since the amount of communication on each iteration can be proportional to `maxMemoryInMB`. + * The default value is conservatively chosen to be 256 MiB to allow the decision algorithm to work in most scenarios. Increasing `maxMemoryInMB` can lead to faster training (if the memory is available) by allowing fewer passes over the data. However, there may be decreasing returns as `maxMemoryInMB` grows since the amount of communication on each iteration can be proportional to `maxMemoryInMB`. * *Implementation details*: For faster processing, the decision tree algorithm collects statistics about groups of nodes to split (rather than 1 node at a time). The number of nodes which can be handled in one group is determined by the memory requirements (which vary per features). The `maxMemoryInMB` parameter specifies the memory limit in terms of megabytes which each worker can use for these statistics. * **`subsamplingRate`**: Fraction of the training data used for learning the decision tree. This parameter is most relevant for training ensembles of trees (using [`RandomForest`](api/scala/index.html#org.apache.spark.mllib.tree.RandomForest$) and [`GradientBoostedTrees`](api/scala/index.html#org.apache.spark.mllib.tree.GradientBoostedTrees)), where it can be useful to subsample the original data. For training a single decision tree, this parameter is less useful since the number of training instances is generally not the main constraint. diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index b3ba4b255b71a..968d668e2c93a 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -437,7 +437,7 @@ See the [configuration page](configuration.html) for information on Spark config <td><code>spark.mesos.executor.memoryOverhead</code></td> <td>executor memory * 0.10, with minimum of 384</td> <td> - The amount of additional memory, specified in MB, to be allocated per executor. By default, + The amount of additional memory, specified in MiB, to be allocated per executor. By default, the overhead will be larger of either 384 or 10% of <code>spark.executor.memory</code>. If set, the final overhead will be this value. </td> diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 49ef2e1ce2a1b..672a4d0f3199a 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -60,7 +60,7 @@ Finally, the following configuration options can be passed to the master and wor </tr> <tr> <td><code>-m MEM</code>, <code>--memory MEM</code></td> - <td>Total amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine's total RAM minus 1 GB); only on worker</td> + <td>Total amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine's total RAM minus 1 GiB); only on worker</td> </tr> <tr> <td><code>-d DIR</code>, <code>--work-dir DIR</code></td> @@ -128,7 +128,7 @@ You can optionally configure the cluster further by setting environment variable </tr> <tr> <td><code>SPARK_WORKER_MEMORY</code></td> - <td>Total amount of memory to allow Spark applications to use on the machine, e.g. <code>1000m</code>, <code>2g</code> (default: total memory minus 1 GB); note that each application's <i>individual</i> memory is configured using its <code>spark.executor.memory</code> property.</td> + <td>Total amount of memory to allow Spark applications to use on the machine, e.g. <code>1000m</code>, <code>2g</code> (default: total memory minus 1 GiB); note that each application's <i>individual</i> memory is configured using its <code>spark.executor.memory</code> property.</td> </tr> <tr> <td><code>SPARK_WORKER_PORT</code></td> diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 6a52e8a7b0ebd..4a1812bbb40a2 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -248,5 +248,5 @@ de-aggregate records during consumption. - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency. #### Kinesis retry configuration - - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms". + - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms". - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for Kinesis fetches. This config can also be used to tackle the Kinesis `ProvisionedThroughputExceededException`'s in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3. diff --git a/docs/tuning.md b/docs/tuning.md index cd0f9cd081369..43acacb98cbf9 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -115,7 +115,7 @@ variety of workloads without requiring user expertise of how memory is divided i Although there are two relevant configurations, the typical user should not need to adjust them as the default values are applicable to most workloads: -* `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB) +* `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MiB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records. @@ -147,7 +147,7 @@ pointer-based data structures and wrapper objects. There are several ways to do Java standard library. 2. Avoid nested structures with a lot of small objects and pointers when possible. 3. Consider using numeric IDs or enumeration objects instead of strings for keys. -4. If you have less than 32 GB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be +4. If you have less than 32 GiB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be four bytes instead of eight. You can add these options in [`spark-env.sh`](configuration.html#environment-variables). @@ -224,8 +224,8 @@ temporary objects created during task execution. Some steps which may be useful * As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the - size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 128 MB, - we can estimate size of Eden to be `4*3*128MB`. + size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 128 MiB, + we can estimate size of Eden to be `4*3*128MiB`. * Monitor how the frequency and time taken by garbage collection changes with the new settings. @@ -267,7 +267,7 @@ available in `SparkContext` can greatly reduce the size of each serialized task, of launching a job over a cluster. If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to -decide whether your tasks are too large; in general tasks larger than about 20 KB are probably +decide whether your tasks are too large; in general tasks larger than about 20 KiB are probably worth optimizing. ## Data Locality diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index e58860fea97d0..e32d615af2a47 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -322,7 +322,7 @@ class BlockMatrix @Since("1.3.0") ( val m = numRows().toInt val n = numCols().toInt val mem = m * n / 125000 - if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") + if (mem > 500) logWarning(s"Storing this matrix will require $mem MiB of memory!") val localBlocks = blocks.collect() val values = new Array[Double](m * n) localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala index 37eb794b0c5c9..6250b0363ee3b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -190,7 +190,7 @@ class GradientDescentClusterSuite extends SparkFunSuite with LocalClusterSparkCo iter.map(i => (1.0, Vectors.dense(Array.fill(n)(random.nextDouble())))) }.cache() // If we serialize data directly in the task closure, the size of the serialized task would be - // greater than 1MB and hence Spark would throw an error. + // greater than 1MiB and hence Spark would throw an error. val (weights, loss) = GradientDescent.runMiniBatchSGD( points, new LogisticGradient, diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8bd6897df925f..b6e17cab44e9c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -127,7 +127,7 @@ def __new__(cls, mean, confidence, low, high): def _parse_memory(s): """ Parse a memory string in the format supported by Java (e.g. 1g, 200m) and - return the value in MB + return the value in MiB >>> _parse_memory("256m") 256 diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index bd0ac0039ffe1..5d2d63850e9b2 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -37,7 +37,7 @@ process = None def get_used_memory(): - """ Return the used memory in MB """ + """ Return the used memory in MiB """ global process if process is None or process._pid != os.getpid(): process = psutil.Process(os.getpid()) @@ -50,7 +50,7 @@ def get_used_memory(): except ImportError: def get_used_memory(): - """ Return the used memory in MB """ + """ Return the used memory in MiB """ if platform.system() == 'Linux': for line in open('/proc/self/status'): if line.startswith('VmRSS:'): diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 9497530805c1a..d37d0d66d8ae2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -127,7 +127,7 @@ private[yarn] class YarnAllocator( private var numUnexpectedContainerRelease = 0L private val containerIdToExecutorId = new HashMap[ContainerId, String] - // Executor memory in MB. + // Executor memory in MiB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala index 8818d0135b297..b7ce367230810 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala @@ -160,7 +160,7 @@ class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(AtLeastNNonNulls(4, nullOnly), false, EmptyRow) } - test("Coalesce should not throw 64kb exception") { + test("Coalesce should not throw 64KiB exception") { val inputs = (1 to 2500).map(x => Literal(s"x_$x")) checkEvaluation(Coalesce(inputs), "x_1") } @@ -171,7 +171,7 @@ class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { assert(ctx.inlinedMutableStates.size == 1) } - test("AtLeastNNonNulls should not throw 64kb exception") { + test("AtLeastNNonNulls should not throw 64KiB exception") { val inputs = (1 to 4000).map(x => Literal(s"x_$x")) checkEvaluation(AtLeastNNonNulls(1, inputs), true) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index d0604b8eb7675..94e251d90bcfa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -128,7 +128,7 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { } } - test("SPARK-16845: GeneratedClass$SpecificOrdering grows beyond 64 KB") { + test("SPARK-16845: GeneratedClass$SpecificOrdering grows beyond 64 KiB") { val sortOrder = Literal("abc").asc // this is passing prior to SPARK-16845, and it should also be passing after SPARK-16845 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index f5d93ee5fa914..e4ec76f0b9a1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -73,14 +73,14 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * greater than the target size. * * For example, we have two stages with the following pre-shuffle partition size statistics: - * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB] - * stage 2: [10 MB, 10 MB, 70 MB, 5 MB, 5 MB] - * assuming the target input size is 128 MB, we will have four post-shuffle partitions, + * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB] + * stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB] + * assuming the target input size is 128 MiB, we will have four post-shuffle partitions, * which are: - * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MB) - * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MB) - * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MB) - * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) + * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB) + * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB) + * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB) + * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB) */ class ExchangeCoordinator( advisoryTargetPostShuffleInputSize: Long, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 27bed1137e5b3..82973307feef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -44,7 +44,7 @@ case class WindowInPandasExec( override def requiredChildDistribution: Seq[Distribution] = { if (partitionSpec.isEmpty) { - // Only show warning when the number of bytes is larger than 100 MB? + // Only show warning when the number of bytes is larger than 100 MiB? logWarning("No Partition Defined for Window operation! Moving all data to a single " + "partition, this can cause serious performance degradation.") AllTuples :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index fede0f3e92d67..729b8bdb3dae8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -90,7 +90,7 @@ case class WindowExec( override def requiredChildDistribution: Seq[Distribution] = { if (partitionSpec.isEmpty) { - // Only show warning when the number of bytes is larger than 100 MB? + // Only show warning when the number of bytes is larger than 100 MiB? logWarning("No Partition Defined for Window operation! Moving all data to a single " + "partition, this can cause serious performance degradation.") AllTuples :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index fc3faa08d55f4..b51c51e663503 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1904,7 +1904,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val e = intercept[SparkException] { df.filter(filter).count() }.getMessage - assert(e.contains("grows beyond 64 KB")) + assert(e.contains("grows beyond 64 KiB")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index cb562d65b6147..02dc32d5f90ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -227,12 +227,12 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared BigInt(0) -> (("0.0 B", "0")), BigInt(100) -> (("100.0 B", "100")), BigInt(2047) -> (("2047.0 B", "2.05E+3")), - BigInt(2048) -> (("2.0 KB", "2.05E+3")), - BigInt(3333333) -> (("3.2 MB", "3.33E+6")), - BigInt(4444444444L) -> (("4.1 GB", "4.44E+9")), - BigInt(5555555555555L) -> (("5.1 TB", "5.56E+12")), - BigInt(6666666666666666L) -> (("5.9 PB", "6.67E+15")), - BigInt(1L << 10 ) * (1L << 60) -> (("1024.0 EB", "1.18E+21")), + BigInt(2048) -> (("2.0 KiB", "2.05E+3")), + BigInt(3333333) -> (("3.2 MiB", "3.33E+6")), + BigInt(4444444444L) -> (("4.1 GiB", "4.44E+9")), + BigInt(5555555555555L) -> (("5.1 TiB", "5.56E+12")), + BigInt(6666666666666666L) -> (("5.9 PiB", "6.67E+15")), + BigInt(1L << 10 ) * (1L << 60) -> (("1024.0 EiB", "1.18E+21")), BigInt(1L << 11) * (1L << 60) -> (("2.36E+21 B", "2.36E+21")) ) numbers.foreach { case (input, (expectedSize, expectedRows)) => ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org