To force one instance per executor, you could explicitly subclass FlatMapFunction and have it lazy-create your parser in the subclass constructor. You might also want to try RDD#mapPartitions() (instead of RDD#flatMap() if you want one instance per partition. This approach worked well for me when I had a flat map function that used non-serializable native code / objects.
FWIW RDD#flatMap() does not appear to have changed 1.1 -> 1.2 (tho master has a slight refactor). Agree it's worth checking the number of partitions in your 1.1 vs 1.2 test. On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO <raofeng...@gmail.com> wrote: > the LogParser instance is not serializable, and thus cannot be a > broadcast, > > what’s worse, it contains an LRU cache, which is essential to the > performance, and we would like to share among all the tasks on the same > node. > > If it is the case, what’s the recommended way to share a variable among > all the tasks within the same executor. > > > 2015-01-21 15:04 GMT+08:00 Davies Liu <dav...@databricks.com>: > >> Maybe some change related to serialize the closure cause LogParser is >> not a singleton any more, then it is initialized for every task. >> >> Could you change it to a Broadcast? >> >> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO <raofeng...@gmail.com> >> wrote: >> > Currently we are migrating from spark 1.1 to spark 1.2, but found the >> > program 3x slower, with nothing else changed. >> > note: our program in spark 1.1 has successfully processed a whole year >> data, >> > quite stable. >> > >> > the main script is as below >> > >> > sc.textFile(inputPath) >> > .flatMap(line => LogParser.parseLine(line)) >> > .groupByKey(new HashPartitioner(numPartitions)) >> > .mapPartitionsWithIndex(...) >> > .foreach(_ => {}) >> > >> > where LogParser is a singleton which may take some time to initialized >> and >> > is shared across the execuator. >> > >> > the flatMap stage is 3x slower. >> > >> > We tried to change spark.shuffle.manager back to hash, and >> > spark.shuffle.blockTransferService back to nio, but didn’t help. >> > >> > May somebody explain possible causes, or what should we test or change >> to >> > find it out >> > >