elon_X created FLINK-35088:
------------------------------

             Summary: watermark alignment maxAllowedWatermarkDrift and 
updateInterval param need check
                 Key: FLINK-35088
                 URL: https://issues.apache.org/jira/browse/FLINK-35088
             Project: Flink
          Issue Type: Improvement
          Components: API / Core, Runtime / Coordination
    Affects Versions: 1.16.1
            Reporter: elon_X
         Attachments: image-2024-04-11-20-12-29-951.png

When I use watermark alignment,

1.I found that setting maxAllowedWatermarkDrift to a negative number initially 
led me to believe it could support delaying the consumption of the source, so I 
tried it. Then, the upstream data flow would hang indefinitely.

Root cause:
{code:java}
long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()               
  + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < 
lastEmittedWatermark, then the SourceReader will be blocked indefinitely and 
cannot recover.

I'm not sure if this is a supported feature of watermark alignment. If it's 
not, I think an additional parameter validation should be implemented to throw 
an exception on the client side if the value is negative.

2.The updateInterval parameter also lacks validation. If I set it to 0, the 
task will throw an exception when starting the job manager. The JDK class 
java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
throws the exception.
{code:java}
java.lang.IllegalArgumentException: null
        at 
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
 ~[?:1.8.0_351]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:191)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:59)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:42)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) 
~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) 
~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 [?:1.8.0_351]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_351]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_351]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_351]{code}
Therefore, I believe it's necessary to validate these two parameters to ensure 
that exceptions are thrown on the client side to alert the user.

!image-2024-04-11-20-12-29-951.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to