[ 
https://issues.apache.org/jira/browse/FLINK-20277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

谢波 updated FLINK-20277:
-----------------------
    Description: 
流式消费Hive表,出现失败时,任务无法正常恢复,一直重启。

一直报错:The ContinuousFileMonitoringFunction has already restored from a previous 
Flink version.

 

{color:#FF0000}java.io.FileNotFoundException: File does not exist: 
hdfs://nameservice1/rawdata/db/bw_hana/sapecc/hepecc_ekko_cut{color}
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270)
 ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262)
 ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[hadoop-common-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1262)
 ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:588)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonit
 oringFunction.java:279) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitori
 ngFunction.java:251) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215)
 ~[
 flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2
 .11-1.11.2.jar:1.11.2]

 

 

2020-11-23 05:00:33,313 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Split Reader: 
HiveFileMonitoringFunction -> S
 ink: Sink(table=[default_catalog.default_database.kafka_hepecc_ekko_cut_json], 
fields=[mandt, ebeln, bukrs, bstyp, bsart, bsakz, loekz, statu
 , aedat, ernam, pincr, lponr, lifnr, spras, zterm, zbd1t, zbd2t, zbd3t, zbd1p, 
zbd2p, ekorg, ekgrp, waers, wkurs, kufix, bedat, kdatb, kdate,
 bwbdt, angdt, bnddt, gwldt, ausnr, angnr, ihran, ihrez, verkf, telf1, llief, 
kunnr, konnr, abgru, autlf, weakt, reswk, lblif, inco1, inco2, 
 ktwrt, submi, knumv, kalsm, stafo, lifre, exnum, unsez, logsy, upinc, stako, 
frggr, frgsx, frgke, frgzu, frgrl, lands, lphis, adrnr, stceg_l,
 stceg, absgr, addnr, kornr, memory, procstat, rlwrt, revno, scmproc, 
reason_code, memorytype, rettp, retpc, dptyp, dppct, dpamt, dpdat, msr_
 id, hierarchy_exists, threshold_exists, legal_contract, description, 
release_date, force_id, force_cnt, reloc_id, reloc_seq_id, source_logsys
 , auflg, yxcort, ysyb, ypsfs, yxqlx, yjplx, ylszj, yxdry, yxdrymc, ylbid1, 
yqybm, ysxpt_order, yy_write_if, fanpfg, yresfg, yrcofg, yretxt, y
 ...skipping...
 {color:#FF0000}java.lang.IllegalArgumentException: The 
ContinuousFileMonitoringFunction has already restored from a previous Flink 
version.{color}
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]

 

  was:
流式消费Hive表,出现失败时,任务无法正常恢复,一直重启。

一直报错:The ContinuousFileMonitoringFunction has already restored from a previous 
Flink version.

 

java.io.FileNotFoundException: File does not exist: 
hdfs://nameservice1/rawdata/db/bw_hana/sapecc/hepecc_ekko_cut
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270)
 ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262)
 ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[hadoop-common-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1262)
 ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
 at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:588)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonit
oringFunction.java:279) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitori
ngFunction.java:251) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215)
 ~[
flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2
.11-1.11.2.jar:1.11.2]

 

 

2020-11-23 05:00:33,313 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Split Reader: 
HiveFileMonitoringFunction -> S
ink: Sink(table=[default_catalog.default_database.kafka_hepecc_ekko_cut_json], 
fields=[mandt, ebeln, bukrs, bstyp, bsart, bsakz, loekz, statu
, aedat, ernam, pincr, lponr, lifnr, spras, zterm, zbd1t, zbd2t, zbd3t, zbd1p, 
zbd2p, ekorg, ekgrp, waers, wkurs, kufix, bedat, kdatb, kdate,
 bwbdt, angdt, bnddt, gwldt, ausnr, angnr, ihran, ihrez, verkf, telf1, llief, 
kunnr, konnr, abgru, autlf, weakt, reswk, lblif, inco1, inco2, 
ktwrt, submi, knumv, kalsm, stafo, lifre, exnum, unsez, logsy, upinc, stako, 
frggr, frgsx, frgke, frgzu, frgrl, lands, lphis, adrnr, stceg_l,
 stceg, absgr, addnr, kornr, memory, procstat, rlwrt, revno, scmproc, 
reason_code, memorytype, rettp, retpc, dptyp, dppct, dpamt, dpdat, msr_
id, hierarchy_exists, threshold_exists, legal_contract, description, 
release_date, force_id, force_cnt, reloc_id, reloc_seq_id, source_logsys
, auflg, yxcort, ysyb, ypsfs, yxqlx, yjplx, ylszj, yxdry, yxdrymc, ylbid1, 
yqybm, ysxpt_order, yy_write_if, fanpfg, yresfg, yrcofg, yretxt, y
...skipping...
java.lang.IllegalArgumentException: The ContinuousFileMonitoringFunction has 
already restored from a previous Flink version.
 at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]

 


> flink-1.11.2 ContinuousFileMonitoringFunction cannot restore from failure
> -------------------------------------------------------------------------
>
>                 Key: FLINK-20277
>                 URL: https://issues.apache.org/jira/browse/FLINK-20277
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem
>    Affects Versions: 1.11.2
>            Reporter: 谢波
>            Assignee: godfrey he
>            Priority: Blocker
>             Fix For: 1.11.3
>
>
> 流式消费Hive表,出现失败时,任务无法正常恢复,一直重启。
> 一直报错:The ContinuousFileMonitoringFunction has already restored from a 
> previous Flink version.
>  
> {color:#FF0000}java.io.FileNotFoundException: File does not exist: 
> hdfs://nameservice1/rawdata/db/bw_hana/sapecc/hepecc_ekko_cut{color}
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270)
>  ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262)
>  ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
>  at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  ~[hadoop-common-2.6.0-cdh5.16.2.jar:?]
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1262)
>  ~[hadoop-hdfs-2.6.0-cdh5.16.2.jar:?]
>  at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:588)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonit
>  oringFunction.java:279) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitori
>  ngFunction.java:251) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215)
>  ~[
>  flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>  ~[flink-dist_2
>  .11-1.11.2.jar:1.11.2]
>  
>  
> 2020-11-23 05:00:33,313 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Split Reader: 
> HiveFileMonitoringFunction -> S
>  ink: 
> Sink(table=[default_catalog.default_database.kafka_hepecc_ekko_cut_json], 
> fields=[mandt, ebeln, bukrs, bstyp, bsart, bsakz, loekz, statu
>  , aedat, ernam, pincr, lponr, lifnr, spras, zterm, zbd1t, zbd2t, zbd3t, 
> zbd1p, zbd2p, ekorg, ekgrp, waers, wkurs, kufix, bedat, kdatb, kdate,
>  bwbdt, angdt, bnddt, gwldt, ausnr, angnr, ihran, ihrez, verkf, telf1, llief, 
> kunnr, konnr, abgru, autlf, weakt, reswk, lblif, inco1, inco2, 
>  ktwrt, submi, knumv, kalsm, stafo, lifre, exnum, unsez, logsy, upinc, stako, 
> frggr, frgsx, frgke, frgzu, frgrl, lands, lphis, adrnr, stceg_l,
>  stceg, absgr, addnr, kornr, memory, procstat, rlwrt, revno, scmproc, 
> reason_code, memorytype, rettp, retpc, dptyp, dppct, dpamt, dpdat, msr_
>  id, hierarchy_exists, threshold_exists, legal_contract, description, 
> release_date, force_id, force_cnt, reloc_id, reloc_seq_id, source_logsys
>  , auflg, yxcort, ysyb, ypsfs, yxqlx, yjplx, ylszj, yxdry, yxdrymc, ylbid1, 
> yqybm, ysxpt_order, yy_write_if, fanpfg, yresfg, yrcofg, yretxt, y
>  ...skipping...
>  {color:#FF0000}java.lang.IllegalArgumentException: The 
> ContinuousFileMonitoringFunction has already restored from a previous Flink 
> version.{color}
>  at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>  at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to