w.r.t. the effective storage level log, here is the JIRA which introduced it:
[SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled On Wed, Apr 13, 2016 at 7:43 AM, Patrick McGloin <mcgloin.patr...@gmail.com> wrote: > Hi all, > > If I am using a Custom Receiver with Storage Level set to StorageLevel. > MEMORY_ONLY_SER_2 and the WAL enabled I get this Warning in the logs: > > 16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: Storage level > replication 2 is unnecessary when write ahead log is enabled, change to > replication 1 > 16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: User defined storage > level StorageLevel(false, true, false, false, 2) is changed to effective > storage level StorageLevel(false, true, false, false, 1) when write ahead log > is enabled > > > My application is running on 4 Executors with 4 cores each, and 1 > Receiver. Because the data is not replicated the processing runs on only > one Executor: > > [image: Inline images 1] > > Instead of 16 cores processing the Streaming data only 4 are being used. > > We cannot reparation the DStream to distribute data to more Executors > since if you call reparation on an RDD which is only located on one node, > the new partitions are only created on that node, which doesn't help. This > theory that repartitioning doesn't help can be tested with this simple > example, which tries to go from one partition on a single node to many on > many nodes. What you find with when you look at the multiplePartitions RDD > in the UI is that its 6 partitions are on the same Executor. > > scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 4).cache.setName("rdd") > rdd: org.apache.spark.rdd.RDD[Int] = rdd ParallelCollectionRDD[0] at > parallelize at <console>:27 > > scala> rdd.count() > res0: Long = 6 > > scala> val singlePartition = > rdd.repartition(1).cache.setName("singlePartition") > singlePartition: org.apache.spark.rdd.RDD[Int] = singlePartition > MapPartitionsRDD[4] at repartition at <console>:29 > > scala> singlePartition.count() > res1: Long = 6 > > scala> val multiplePartitions = > singlePartition.repartition(6).cache.setName("multiplePartitions") > multiplePartitions: org.apache.spark.rdd.RDD[Int] = multiplePartitions > MapPartitionsRDD[8] at repartition at <console>:31 > > scala> multiplePartitions.count() > res2: Long = 6 > > Am I correct in the use of reparation, that the data does not get shuffled if > it is all on one Executor? > > Shouldn't I be allowed to set the Receiver replication factor to two when the > WAL is enabled so that multiple Executors can work on the Streaming input > data? > > We will look into creating 4 Receivers so that the data gets distributed > more evenly. But won't that "waste" 4 cores in our example, where one > would do? > > Best regards, > Patrick > > > > >