Hi, I'm testing spark streaming with actor receiver. The actor keeps calling store() to save a pair to Spark.
Once the job is launched, on the UI everything looks good. Millions of events gets through every batch. However, I added logging to the first step and found that only 20 or 40 events in a batch actually gets to the first mapper. Any idea what might be causing this? I also have log in the custom receiver before "store()" call and it's really calling this function millions of times. The receiver definition looks like: val stream = ssc.actorStream[(String, Message)](MessageRetriever.props("message-retriever", mrSections.head, conf, flowControlDef, None, None), "Martini", StorageLevel.MEMORY_ONLY_SER) The job looks like: stream.map { pair => logger.info(s"before pipeline key=${pair._1}") // Only a handful gets logged although there are over 1 million in a batch pair._2 }.flatMap { m => // Event Builder logger.info(s"event builder thread-id=${Thread.currentThread().getId} user=${m.fields.getOrElse('user, "NA")}") ebHelper(m) }.map { e => // Event Normalizer logger.info(s"normalizer thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}") DefaultEventNormalizer.normalizeFields(e) }.map { e => logger.info(s"resolver thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}") resolver(e) }.flatMap { e => // Event Discarder logger.info(s"discarder thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}") discarder(e) }.map { e => ep(e) }