Hi,

I'm testing spark streaming with actor receiver. The actor keeps calling 
store() to save a pair to Spark.

Once the job is launched, on the UI everything looks good. Millions of events 
gets through every batch. However, I added logging to the first step and found 
that only 20 or 40 events in a batch actually gets to the first mapper. Any 
idea what might be causing this?

I also have log in the custom receiver before "store()" call and it's really 
calling this function millions of times.

The receiver definition looks like:


val stream = ssc.actorStream[(String, 
Message)](MessageRetriever.props("message-retriever",
  mrSections.head, conf, flowControlDef, None, None), "Martini",
  StorageLevel.MEMORY_ONLY_SER)


The job looks like:

stream.map { pair =>
    logger.info(s"before pipeline key=${pair._1}") // Only a handful gets 
logged although there are over 1 million in a batch
    pair._2
}.flatMap { m =>
  // Event Builder
  logger.info(s"event builder thread-id=${Thread.currentThread().getId} 
user=${m.fields.getOrElse('user, "NA")}")
  ebHelper(m)
}.map { e =>
  // Event Normalizer
  logger.info(s"normalizer thread-id=${Thread.currentThread().getId} 
user=${e.getFieldAsString('user)}")
  DefaultEventNormalizer.normalizeFields(e)
}.map { e =>
  logger.info(s"resolver thread-id=${Thread.currentThread().getId} 
user=${e.getFieldAsString('user)}")
  resolver(e)
}.flatMap { e =>
  // Event Discarder
  logger.info(s"discarder thread-id=${Thread.currentThread().getId} 
user=${e.getFieldAsString('user)}")
  discarder(e)
}.map { e =>
  ep(e)
}

Reply via email to