Access broadcast variable from within function passed to reduceByKey

2016-11-15 Thread coolgar
For example:

rows.reduceByKey(reduceKeyMapFunction)

reduceKeyMapFunction(log1: Map[String, Long], log2: Map[String, Long]):
Map[String,Long] = {
val bcast = broadcastv.value
val countFields = dbh.getCountFields
val aggs: Map[String, Long] = Map()
countFields.foreach { f => 
  val valueSum = aggLog1(f) + aggLog2(f)
  aggs ++ Map(f -> valueSum)
}
aggs
}

I can't pass broadcast to the reduceKeyMapFunction. I create the broadcast
variable (broadcastv) in the driver but I fear it will not be initialized on
the workers where the reduceKeyMapFunction runs. I've tried this but when
accessing broadcastv a NPE is thrown.

I can't pass it to the reduceKeyMapFunction because it can only accept two
params (log1, log2). 
Any ideas?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Access-broadcast-variable-from-within-function-passed-to-reduceByKey-tp28082.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-09 Thread coolgar
Solution provided by Cody K :

I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
 if  // it's a header
parser = someParserBasedOn(line)
else
   items += parser.parse(line)
 }
 items.iterator
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-07 Thread coolgar
I'm using apache spark streaming with the kafka direct consumer. The data
stream I'm receiving is log data that includes a header with each block of
messages. Each DStream can therefore have many blocks of messages, each with
it's own header. 

The header is used to know how to interpret the following fields in the
block of messages. My challenge is that I'm building up (K,V) pairs that are
processed by reduceByKey() and I use this header to know how to parse the
fields that follow the header into the (K,V) pairs.
 
So each message received by kakfa may appear as follows (# denotes the
header field, \n denotes new line):
#fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
field6 field7\data4 data5 data6 data7\n...

Is there a way, without collecting all data back to the driver, to "grab"
the header and use it to subsequently process the messages that follow the
header until a new #fields comes along, rinse, repeat? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org