Ron Cai created BEAM-6859:
-----------------------------

             Summary: DoFn tearDown function will not be invoked if there is no 
data in the batch
                 Key: BEAM-6859
                 URL: https://issues.apache.org/jira/browse/BEAM-6859
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 2.11.0
            Reporter: Ron Cai


In the implementation of 
[MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
 
{code:java}
@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
  if (!wasSetupCalled) {
    DoFnInvokers.invokerFor(doFn).invokeSetup();
    wasSetupCalled = true;
  }
  ...

   return new SparkProcessContext<>(
        doFn,
        doFnRunnerWithMetrics,
        outputManager,
        stateful ? new TimerDataIterator(timerInternals) : 
Collections.emptyIterator())
        .processPartition(iter)
        .iterator();
}{code}
It will call setup function of a DoFn every batch in spark streaming.  And the 
tearDown function of DoFn will invoked by 
[SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
 instance. But in the implementation of SparkProcessContext.processParition(), 
if the partition is empty, it will return an empty ArrayList instance directly. 
If there is no data in the batch, it means the tearDown function of DoFn will 
not be invoked for it is invoked in the ProcCtxtIterator instance which created 
only when there are data (parition.hasNext() == true).
{code:java}
Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition) 
throws Exception {
        
        // skip if partition is empty.
        if (!partition.hasNext()) {
            return new ArrayList<>();
        }
        
        // process the partition; finishBundle() is called from within the 
output iterator.
        return this.getOutputIterable(partition, doFnRunner);
        }

{code}
In this implementation, if build a pipeline to read from KafkaIO.read and write 
by KafkaIO.write() to kafka and run as a spark streaming application, don't 
send any data to the kafka topic. Thread count of kafka producer will keep 
increasing and OOO at the end.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to