I have the following sequence of Spark Java API calls (Spark 2.0.2):

   1. Kafka stream that is processed via a map function, which returns the
   string value from tuple2._2() for JavaDStream as in

return tuple2._2();

   1.

   The returned JavaDStream is then processed by foreachPartition, which is
   wrapped by foreachRDD.
   2.

   foreachPartition's call function does Iterator on the RDD as in
   inputRDD.next ();

When data is received, step 1 is executed, which is correct. However,
inputRDD.next () in step 3 makes a duplicate call to the map function in
step 1. So that map function is called twice for every message:

-  the first time when the message is received from the Kafka stream, and

- the second time when Iterator inputParams.next () is invoked from
foreachPartition's call function.

I also tried transforming the data in the map function as in

public TestTransformedClass call(Tuple2<String, String>  tuple2) for step 1

public void call(Iterator<TestTransformedClass>  inputParams) for step 3

and the same issue occurs. So this issue occurs, no matter whether this
sequence of Spark API calls involves data transformation or not.

Questions:

   1.

   Since the message was already processed in step 1, why does
   inputRDD.next () in step 3 makes a duplicate call to the map function in
   step 1 ?
   2.

   How do I fix it to avoid duplicate invocation for every message ?

Thanks.

Reply via email to