Hi Jungtaek,

Thanks for looking into it. We use spark-2.4.3.
I removed most of our code and pasted here just to understand the flow.
Sorry for the delay. I would try to provide a simple reproducer when I find
time, but this is really hurting us.

Another observation I see is basically only if I add some value into
*outputEvents.
List in **FlatMapIdFedGroupFunction I see this is called multiple times
otherwise it's only once.*

*By any chance you can provide me on how to debug this and any guesses what
could be wrong so that I can focus on debugging on the right path.*

*thanks*
*Robin Kuttaiah*

On Fri, Mar 12, 2021 at 8:43 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> Hi,
>
> Could you please provide the Spark version?
>
> Also it would be pretty much helpful if you could provide a simple
> reproducer, like placing your reproducer which can simply be built (mvn or
> gradle or sbt) into your Github repository, plus the set of input data to
> see the behavior. Worth to know that others aren't interested in your own
> code even if they are interested in the problematic behavior itself. It'd
> be nice if you can minimize the hurdle on debugging.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin <kutta...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a use case where I need to read events(non correlated) from a
>> source kafka topic, then correlate and push forward to another target topic.
>>
>> I use spark structured streaming with FlatMapGroupsWithStateFunction
>> along with   GroupStateTimeout.ProcessingTimeTimeout() . After each
>> timeout, I apply some correlation logic on group events and push forward
>> correlated events to another topic(via ForEachBatch). Non correlated events
>> are stored in the state until they are correlated in a future set of events.
>>
>> With this scenario, when I push a single event to source topic, I see it
>> comes three times to FlatMapGroupsWithStateFunction(In separate timestamp)
>> but only once in ForEachBatch processor(which is good).
>>
>> Same event coming thrice in FlatMapGroupsWithStateFunction is a problem
>> as it causes issues with my correlation logic.
>>
>> Can someone help me to understand why this is seen thrice
>> in FlatMapGroupsWithStateFunction?.
>>
>> Code snippets are shown below. Please let me know what is missing and how
>> can i solve this,
>>
>> thanks,
>> Robin Kuttaiah
>>
>> *StreamQuery*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *    Dataset<MilestoneEvent> sessionUpdates = null;
>> FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector,
>> MilestoneEvent> idstateUpdateFunction =         new
>> FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);    try {
>>   sessionUpdates = idFedKafkaEvents          .groupByKey(              new
>> MapFunction<Row, String>() {                private static final long
>> serialVersionUID = -797571731893988577L;                @Override public
>> String call(Row event) {                  return
>> event.getAs("EVENT_MODEL_ID_COL");                }              },
>> Encoders.STRING())
>> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>>           Encoders.bean(IdentifierConnector.class),
>> Encoders.bean(MilestoneEvent.class),
>> GroupStateTimeout.ProcessingTimeTimeout());    } catch (Exception
>> oException) {      //log and throw back exception*
>>
>>
>>
>>
>>
>>
>>
>>
>> *    }    ForeachBatchProcessor oForeachBatch = new
>> ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent,
>> m_strQueryName);    DataStreamWriter<MilestoneEvent> events =
>> sessionUpdates        .writeStream()        .queryName(queryName)
>> .outputMode("append")        .trigger(Trigger.ProcessingTime("*5 seconds"
>> *))*
>>
>>
>> *        .option("checkpointLocation", checkpointLocation)
>> .foreachBatch(oForeachBatch);*
>>
>>
>> *FlatMapGroupsWithStateFunction:*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class FlatMapIdFedGroupFunction implements
>> FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector,
>> MilestoneEvent> {  public FlatMapIdFedGroupFunction(InsightEvent iEvent,
>> InsightDeployment iDeployment) {  }  @Override  public
>> Iterator<MilestoneEvent> call(String key, Iterator<Row> events,
>> GroupState<IdentifierConnector> state)      throws Exception {
>> List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>();
>> IdentifierConnector session = null;
>> IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"
>>  "+System.currentTimeMillis()); //Called thrice    if (!state.exists() ) {
>>     session = new IdentifierConnector();    }  else {      session =
>> state.get();    }    while (events.hasNext()) {      Row event =
>> events.next();      MilestoneEvent mEventCurr =
>> IdFederationUtil.getMilestoneEvent(event, insightEvent);
>> outputEvents.add(mEventCurr);
>> IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called
>> thrice      break;    }    return outputEvents.iterator();  }*
>>
>> *}*
>>
>>
>> *ForEachBatchFunction:*
>>
>> public class ForeachBatchProcessor implements
>> VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable {
>>
>>   private static final long serialVersionUID = 1L;
>>
>>   public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
>>       InsightEvent in_oInsightEvent, String in_strQueryName) {
>> \  }
>>
>>   public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID)
>>       throws Exception {
>>     if (in_Rows.count() == 0L) {
>>       return;
>>     }
>>     IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+
>> in_Rows.count());
>>     List<MilestoneEvent> events = in_Rows.collectAsList();
>>     for(MilestoneEvent m: events) {
>>       IdFederationUtil.write("......BATCH "+m.getMilestoneId());
>>     }
>>   }
>>
>> }
>>
>>
>>

Reply via email to