[ 
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576998#comment-17576998
 ] 

Michael commented on FLINK-28817:
---------------------------------

Clarification for the problem:

1. --  HybridSourceSplitEnumerator.switchEnumerator failed with 
com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed out
Caused by: java.net.SocketTimeoutException: Read timed out
This is intermittent error, it is usually fixed, when Flink recover from 
checkpoint & repeat the operation

2. --  Flink starts recovering from checkpoint:
CheckpointCoordinator - Restoring job 00000000000000000000000000000000 from 
Checkpoint
SourceCoordinator - Closing SourceCoordinator for source Source: hybrid-source.
SourceCoordinator - Restoring SplitEnumerator of source Source: hybrid-source 
from checkpoint.
SourceCoordinator - Starting split enumerator for source Source: hybrid-source.
HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=788
HybridSourceSplitEnumerator - Starting enumerator for sourceIndex=788

3. --  HybridSourceSplitEnumerator receives 
SourceReaderFinishedEvent\{sourceIndex=-1}
HybridSourceSplitEnumerator - handleSourceEvent 
SourceReaderFinishedEvent\{sourceIndex=-1} subtask=6

4. --  Processing this event cause 
2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
exception in the SplitEnumerator for Source Source: hybrid-source while 
handling operator event 
SourceEventWrapper[SourceReaderFinishedEvent\{sourceIndex=-1}] from subtask 6. 
Triggering job failover.
java.lang.NullPointerException: Source for index=0 is not available from 
sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

I'm running my version of the Hybrid Sources with additional logging, so line 
numbers & some names could be different from Flink Github.

My Observation: the problem is intermittent, sometimes it works ok, i.e. 
SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my log, 
it happens if my SourceFactory.create()  is executed BEFORE 
HybridSourceSplitEnumerator - handleSourceEvent 
SourceReaderFinishedEvent\{sourceIndex=-1}.
If  HybridSourceSplitEnumerator - handleSourceEvent is executed before my 
SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent

[~thw] , [~mason6345] could you, please, look at this issue?

Preconditions-checkNotNull-error log from JobMgr is attached

> NullPointerException in HybridSource when restoring from checkpoint
> -------------------------------------------------------------------
>
>                 Key: FLINK-28817
>                 URL: https://issues.apache.org/jira/browse/FLINK-28817
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.14.4, 1.15.1
>            Reporter: Michael
>            Priority: Major
>         Attachments: bf-29-JM-err-analysis.log
>
>
> Scenario:
>  # CheckpointCoordinator - Completed checkpoint 14 for job 
> 00000000000000000000000000000000
>  # HybridSource successfully completed processing a few SourceFactories, that 
> reads from s3
>  # Next SourceFactory try to read contents of s3 dir, and it cause an error 
> Unable to execute HTTP request: Read timed out
>  # CheckpointCoordinator - Restoring job 00000000000000000000000000000000 
> from Checkpoint 14
>  # HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47
>  # This restoring fail, because of NullPointerException: in 
> HybridSourceSplitEnumerator.close:
>  # Because of this issue, all future restoring from checkpoint also failed
> Extract from the log: --------------
> 2022/08/02 22:26:51.227 INFO  o.a.f.r.c.CheckpointCoordinator - Restoring job 
> 00000000000000000000000000000000 from Checkpoint 14 @ 1659478803949 for 
> 00000000000000000000000000000000 located at 
> s3://spp-state-371299021277-tech-aidata-di/mb-backfill-jul-20-backfill-prd/2/checkpoints/00000000000000000000000000000000/chk-14.
> 2022/08/02 22:26:51.240 INFO  o.a.f.r.c.CheckpointCoordinator - No master 
> state to restore
> 2022/08/02 22:26:51.240 INFO  o.a.f.r.o.c.RecreateOnResetOperatorCoordinator 
> - Resetting coordinator to checkpoint.
> 2022/08/02 22:26:51.241 INFO  o.a.f.r.s.c.SourceCoordinator - Closing 
> SourceCoordinator for source Source: hybrid-source.
> 2022/08/02 22:26:51.424 INFO  o.a.f.r.s.c.SourceCoordinator - Restoring 
> SplitEnumerator of source Source: hybrid-source from checkpoint.
> 2022/08/02 22:26:51.425 INFO  o.a.f.r.s.c.SourceCoordinator - Starting split 
> enumerator for source Source: hybrid-source.
> 2022/08/02 22:26:51.426 INFO  c.i.d.s.f.s.c.b.HourlyFileSourceFactory - 
> Reading input data from path 
> s3://idl-kafka-connect-ued-raw-uw2-data-lake-prd/data/topics/sbseg-qbo-clickstream/d_20220729-2300
>  for 2022-07-29T23:00:00Z
> 2022/08/02 22:26:51.426 INFO  o.a.f.c.b.s.h.HybridSourceSplitEnumerator - 
> Restoring enumerator for sourceIndex=47
>  
> 2022/08/02 22:26:51.435 INFO  o.a.f.runtime.jobmaster.JobMaster - Trying to 
> recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> 
> filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator 
> fd9fbc680ee884c4eafd0b9c2d3d007f).
> at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> ...
> Caused by: java.lang.NullPointerException: null
> at 
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246)
> at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151)
> at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70)
> at java.lang.Thread.run(Thread.java:750)
> -----------------------------------



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to