Hi,

After some experiments, there're three methods that work in this 'join
DStream with other dataset which is updated periodically'.

1. Create an RDD in transform operation

val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split("_"))
val filtered = words transform { rdd =>
  val spam = ssc.sparkContext.textFile("spam.txt").collect.toSet
  rdd.filter(!spam(_))
}

The caveat is 'spam.txt' will be read in every batch.

2. Use variable broadcast variable...

  var bc = ssc.sparkContext.broadcast(getSpam)
  val filtered = words.filter(!bc.value(_))

  val pool = Executors.newSingleThreadScheduledExecutor
  pool.scheduleAtFixedRate(new Runnable {
    def run(): Unit = {
      val obc = bc
      bc = ssc.sparkContext.broadcast(getSpam)
      obc.unpersist
    }
  }, 0, 5, TimeUnit.SECONDS)

I'm surprised to come up with this solution, but I don't like var, and
the unpersist thing looks evil.

3. Use accumulator

val spam = ssc.sparkContext.accumulableCollection(getSpam.to[mutable.HashSet])
val filtered = words.filter(!spam.value(_))

    def run(): Unit = {
      spam.setValue(getSpam.to[mutable.HashSet])
    }

Now it looks less ugly...

Anyway, I still hope there's a better solution.

On Sun, Jan 18, 2015 at 2:12 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> Can't you send a special event through spark streaming once the list is
> updated? So you have your normal events and a special reload event
>
> Le 17 janv. 2015 15:06, "Ji ZHANG" <zhangj...@gmail.com> a écrit :
>>
>> Hi,
>>
>> I want to join a DStream with some other dataset, e.g. join a click
>> stream with a spam ip list. I can think of two possible solutions, one
>> is use broadcast variable, and the other is use transform operation as
>> is described in the manual.
>>
>> But the problem is the spam ip list will be updated outside of the
>> spark streaming program, so how can it be noticed to reload the list?
>>
>> For broadcast variables, they are immutable.
>>
>> For transform operation, is it costly to reload the RDD on every
>> batch? If it is, and I use RDD.persist(), does it mean I need to
>> launch a thread to regularly unpersist it so that it can get the
>> updates?
>>
>> Any ideas will be appreciated. Thanks.
>>
>> --
>> Jerry
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>



-- 
Jerry

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to