Maximilian Michels created BEAM-6929:
----------------------------------------

             Summary: Session Windows with lateness cause NullPointerException 
in Flink Runner
                 Key: BEAM-6929
                 URL: https://issues.apache.org/jira/browse/BEAM-6929
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
            Reporter: Maximilian Michels
            Assignee: Maximilian Michels
             Fix For: 2.12.0


Reported on the mailing list:

{noformat}
I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster - 1.7.2.

I have this flow in my pipeline:
KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with 
gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default 
trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->  Enrichment  
-->  KafkaSink

I am generating data in such a way that the first two records belong to two 
different sessions. And, generating the third record before the first session 
expires with the timestamp for the third record in such a way that the two 
sessions will be merged to become a single session.

For Example, These are the sample input and output obtained when I ran the same 
pipeline in DirectRunner.

Sample Input:
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}

Sample Output:
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}

Where "NumberOfRecords" is the count, "WST" is the Avro field Name which 
indicates the window start time for the session window. Similarly "WET" 
indicates the window End time of the session window. I am getting "WST" and 
"WET" after remerging and applying ParDo(Enrichment stage of the pipeline).

The program ran successfully in DirectRunner. But, in FlinkRunner, I am getting 
this exception when the third record arrives:

2019-03-27 15:31:00,442 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> 
DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
 -> (Window.Into()/Window.Assign.out -> 
DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
 -> 
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
 by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem, 
DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
 -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default 
key/Map/ParMultiDo(Anonymous) -> 
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka 
ProducerRecord/Map/ParMultiDo(Anonymous) -> 
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
 (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
2019-03-27 15:33:25,427 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
 -> 
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
 -> 
DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous) -> 
DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) -> 
DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) -> 
DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
 -> 
DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
 -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default 
key/Map/ParMultiDo(Anonymous) -> 
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka 
ProducerRecord/Map/ParMultiDo(Anonymous) -> 
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
 (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
        at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
        at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at 
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
        at 
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
        at 
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
        at 
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
        at 
org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
        at 
org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
        at 
org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
        at 
org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
        at 
org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
        at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
        at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)

Is this a known issue with FlinkRunner? Is Session Windows with lateness 
@experimental in FlinkRunner?

I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster - 1.5.3 
and came across the same exception.

I have also tried generating data with lateness as 0, and everything is working 
as expected. Seems like there is no problem in merging the windows of the 
records which belong to the same session.

{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to