w.r.t. the effective storage level log, here is the JIRA which introduced
it:

[SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled

On Wed, Apr 13, 2016 at 7:43 AM, Patrick McGloin <mcgloin.patr...@gmail.com>
wrote:

> Hi all,
>
> If I am using a Custom Receiver with Storage Level set to StorageLevel.
> MEMORY_ONLY_SER_2 and the WAL enabled I get this Warning in the logs:
>
> 16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: Storage level 
> replication 2 is unnecessary when write ahead log is enabled, change to 
> replication 1
> 16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: User defined storage 
> level StorageLevel(false, true, false, false, 2) is changed to effective 
> storage level StorageLevel(false, true, false, false, 1) when write ahead log 
> is enabled
>
>
> My application is running on 4 Executors with 4 cores each, and 1
> Receiver.  Because the data is not replicated the processing runs on only
> one Executor:
>
> [image: Inline images 1]
>
> Instead of 16 cores processing the Streaming data only 4 are being used.
>
> We cannot reparation the DStream to distribute data to more Executors
> since if you call reparation on an RDD which is only located on one node,
> the new partitions are only created on that node, which doesn't help.  This
> theory that repartitioning doesn't help can be tested with this simple
> example, which tries to go from one partition on a single node to many on
> many nodes.  What you find with when you look at the multiplePartitions RDD
> in the UI is that its 6 partitions are on the same Executor.
>
> scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 4).cache.setName("rdd")
> rdd: org.apache.spark.rdd.RDD[Int] = rdd ParallelCollectionRDD[0] at 
> parallelize at <console>:27
>
> scala> rdd.count()
> res0: Long = 6
>
> scala> val singlePartition = 
> rdd.repartition(1).cache.setName("singlePartition")
> singlePartition: org.apache.spark.rdd.RDD[Int] = singlePartition 
> MapPartitionsRDD[4] at repartition at <console>:29
>
> scala> singlePartition.count()
> res1: Long = 6
>
> scala> val multiplePartitions = 
> singlePartition.repartition(6).cache.setName("multiplePartitions")
> multiplePartitions: org.apache.spark.rdd.RDD[Int] = multiplePartitions 
> MapPartitionsRDD[8] at repartition at <console>:31
>
> scala> multiplePartitions.count()
> res2: Long = 6
>
> Am I correct in the use of reparation, that the data does not get shuffled if 
> it is all on one Executor?
>
> Shouldn't I be allowed to set the Receiver replication factor to two when the 
> WAL is enabled so that multiple Executors can work on the Streaming input 
> data?
>
> We will look into creating 4 Receivers so that the data gets distributed
> more evenly.  But won't that "waste" 4 cores in our example, where one
> would do?
>
> Best regards,
> Patrick
>
>
>
>
>

Reply via email to