[jira] [Created] (FLINK-31610) Refactoring of LocalBufferPool
Anton Kalashnikov created FLINK-31610: - Summary: Refactoring of LocalBufferPool Key: FLINK-31610 URL: https://issues.apache.org/jira/browse/FLINK-31610 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.17.0 Reporter: Anton Kalashnikov FLINK-31293 bug highlighted the issue with the internal mutual consistency of different fields in LocalBufferPool. ex.: - `numberOfRequestedOverdraftMemorySegments` - `numberOfRequestedMemorySegments` - `availableMemorySegment` - `currentPoolSize` Most of the problem was fixed already(I hope) but it is a good idea to reorganize the code in such a way that all invariants between all fields inside will be clearly determined and difficult to break. As one example I can propose getting rid of numberOfRequestedOverdraftMemorySegments and using existing numberOfRequestedMemorySegments instead. That means: - the pool will be available when `!availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0` - we don't request a new `ordinary` buffer when `numberOfRequestedMemorySegments >= currentPoolSize` but we request the overdraft buffer instead - `setNumBuffers` should work automatically without any changes I think we can come up with a couple of such improvements to simplify the code. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30278) Unexpected config mutation in SinkTransformationTranslator
Anton Kalashnikov created FLINK-30278: - Summary: Unexpected config mutation in SinkTransformationTranslator Key: FLINK-30278 URL: https://issues.apache.org/jira/browse/FLINK-30278 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.16.0, 1.17.0 Reporter: Anton Kalashnikov If we forbid changing configuration programmatically(`execution.program-config.enabled`) and try to use `FileSink`. The following exception will occur: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not allowed configuration change(s) were detected: - Configuration parallelism.default:1 not allowed in the configuration object ExecutionConfig. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319) at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not allowed configuration change(s) were detected: - Configuration parallelism.default:1 not allowed in the configuration object ExecutionConfig. at org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347) ... 16 more It happens since inside of `SinkTransformationTranslator` we have following logic: * Remeber the current parallelism * Set parallelism to default * Do transformation * Set parallelism to remembered one But if the initial prallelism is default we actually should do nothing but according current logic we explicitly set default value to the configuration which actually is the programmatic config mutation(which we want to avoid) See org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28468) Creating benchmark channel in Apache Flink slack
Anton Kalashnikov created FLINK-28468: - Summary: Creating benchmark channel in Apache Flink slack Key: FLINK-28468 URL: https://issues.apache.org/jira/browse/FLINK-28468 Project: Flink Issue Type: Improvement Reporter: Anton Kalashnikov It needs to create slack channel to which the result of benchmarks will be sent. Steps: * Create `dev-benchmarks` channel(with the help of admin of Apache Flink slack needed) * Create `Slack app` for sending benchmark results to slack(admin help as well). * Configure existing Jenkins with the new `Slack app`'s token to send the result of benchmarks to this new channel. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28407) Make Jenkins benchmark script more autonomous
Anton Kalashnikov created FLINK-28407: - Summary: Make Jenkins benchmark script more autonomous Key: FLINK-28407 URL: https://issues.apache.org/jira/browse/FLINK-28407 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Anton Kalashnikov Right now, we have Jenkins scripts in the benchmarks repository which contains all logic for launching benchmarks and sending data to Codespeed. But unfortunately, it is not clear to external observers how to use that in case of current Jenkins will be lost. The suggestion is: * add information to Readme file about Jenkins and Codespeed. * add default values to the script rather than keeping them in only Jenkins. After this task, anybody should be able easily to configure these benchmarks on their own Jenkins + Codespeed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28243) Considering only time of targeted operation for calculation benchmarks performance
Anton Kalashnikov created FLINK-28243: - Summary: Considering only time of targeted operation for calculation benchmarks performance Key: FLINK-28243 URL: https://issues.apache.org/jira/browse/FLINK-28243 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Anton Kalashnikov Right now, in many benchmarks, the iteration time consists of many parts which are not always connected to the target of the particular benchmark. For example, if we want to benchmark async operator, we want to take into account only the time which was spent on this operator but currently, the time of one iteration of asyncOperatorBenchmark is `job/tasks initialization phase` + `operator execution` + `shutdown phase`. Ideally, we want to isolate somehow `operator execution` time and consider only it in this benchmark but `job/tasks initialization phase` should be benchmarked separately. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28241) Massive regression on 20.06.2021
Anton Kalashnikov created FLINK-28241: - Summary: Massive regression on 20.06.2021 Key: FLINK-28241 URL: https://issues.apache.org/jira/browse/FLINK-28241 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov In FLINK-27921 added property `REQUIREMENTS_CHECK_DELAY` that optimizes the number of checks of resources but it also leads to a little delay during the init phase which is totally not critical for the normal scenario. But unfortunately, our benchmarks can not exclude the time of the initial phase(or shutdown phase) from the total time of benchmarks. As result, all benchmarks, that have a relatively short execution time(compare to the initial phase time), have regression(up to 50%). As a quick solution, it makes sense just to set `REQUIREMENTS_CHECK_DELAY` to zero for all benchmarks. Not full list of affected benchmarks: * asyncWait * globalWindow * mapSink * readFileSplit * remoteRebalance * serializer* * sorted* * twoInputOneIdleMapSink -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-26326) FLIP-203: Support native and incremental savepoints 1.1
Anton Kalashnikov created FLINK-26326: - Summary: FLIP-203: Support native and incremental savepoints 1.1 Key: FLINK-26326 URL: https://issues.apache.org/jira/browse/FLINK-26326 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing Reporter: Anton Kalashnikov Fix For: 1.16.0 This is the second part of the implementation of [https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints] The first one: FLINK-25276 Motivation. Currently with non incremental canonical format savepoints, with very large state, both taking and recovery from savepoints can take very long time. Providing options to take native format and incremental savepoint would alleviate this problem. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26325) Add test coverage for native format State Processor API
Anton Kalashnikov created FLINK-26325: - Summary: Add test coverage for native format State Processor API Key: FLINK-26325 URL: https://issues.apache.org/jira/browse/FLINK-26325 Project: Flink Issue Type: Sub-task Components: API / State Processor Reporter: Anton Kalashnikov https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints Check test coverage for: * State Processor API reading part Right now we have tests that cover only reading from CANONICAL savepoint but it makes sense to add the same tests for NATIVE savepoint and aligned checkpoint -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26134) Document checkpoint/savepoint guarantees
Anton Kalashnikov created FLINK-26134: - Summary: Document checkpoint/savepoint guarantees Key: FLINK-26134 URL: https://issues.apache.org/jira/browse/FLINK-26134 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Document checkpoint/savepoint guarantees according to table from https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Proposal -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26130) Document why and when user would like to increase network buffer size
Anton Kalashnikov created FLINK-26130: - Summary: Document why and when user would like to increase network buffer size Key: FLINK-26130 URL: https://issues.apache.org/jira/browse/FLINK-26130 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Expand https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-number-of-in-flight-buffers with more details. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26002) Add test coverage for native format job upgrades
Anton Kalashnikov created FLINK-26002: - Summary: Add test coverage for native format job upgrades Key: FLINK-26002 URL: https://issues.apache.org/jira/browse/FLINK-26002 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.15.0 Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Fix For: 1.15.0 # Initialization of job with an operator with states. # Do native savepoint/canonical savepoint/aligned checkpoint ## Change job shape(new custom operator) and/or record types ## One job, with all possible network exchanges (forward, keyBy, rebalance, broadcast, random, rescale), followed by stateful operator. ## No need to modify state schema. Just validate in some way that after upgrade state is assigned to correct operators (so the state should be affecting result of the operators/functions) ## Some record type that would allow us to validate consistency of the computations ## Validating sink, checking for the consistency ## Arbitrary job upgrade. Add two new operators. A chained mapping operator and a another keyed proceeded by keyed exchange . Change record type from int to string, without changing the “logical” value of the record (for example change 1 → "1") ## Job upgrade w/o changing the graph record type: no need to test for that # Restore from savepoint/checkpoint -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25891) NoClassDefFoundError AsyncSSLPrivateKeyMethod in benchmark
Anton Kalashnikov created FLINK-25891: - Summary: NoClassDefFoundError AsyncSSLPrivateKeyMethod in benchmark Key: FLINK-25891 URL: https://issues.apache.org/jira/browse/FLINK-25891 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.15.0 Reporter: Anton Kalashnikov -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25650) Document unaligned checkpoints performance limitations (larger records/flat map/timers/...)
Anton Kalashnikov created FLINK-25650: - Summary: Document unaligned checkpoints performance limitations (larger records/flat map/timers/...) Key: FLINK-25650 URL: https://issues.apache.org/jira/browse/FLINK-25650 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov The unaligned checkpoint can be delayed if the current record is consumed too long(because it is too large or it is the flat map etc.). Which can be pretty confused. So it makes sense to document this limitation to give the user understanding of this situation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25646) Document buffer debloating issues with high parallelism
Anton Kalashnikov created FLINK-25646: - Summary: Document buffer debloating issues with high parallelism Key: FLINK-25646 URL: https://issues.apache.org/jira/browse/FLINK-25646 Project: Flink Issue Type: Improvement Reporter: Anton Kalashnikov According to last benchmarks, there are some problems with buffer debloat when job has high parallelism. The high parallelism means the different value from job to job but in general it is more than 200. So it makes sense to document that problem and propose the solution - increasing the number of buffers. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25590) Logging warning of insufficient memory for all configured buffers
Anton Kalashnikov created FLINK-25590: - Summary: Logging warning of insufficient memory for all configured buffers Key: FLINK-25590 URL: https://issues.apache.org/jira/browse/FLINK-25590 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.15.0 Reporter: Anton Kalashnikov Right now, if exclusive buffers for the input channel and one buffer for each subpartition would be allocated on start successfully but there would be not enough memory for the rest of the buffers(floating buffers, rest buffers for subpartitions), then we see nothing in the log about that(as I understand). So first of all, we need to check what logs we have right now about situations when flink doesn't have enough memory for all configured buffers. And if we have nothing (or not enough) we should add such a log. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25556) Extra waiting of final checkpoint in benchmarks
Anton Kalashnikov created FLINK-25556: - Summary: Extra waiting of final checkpoint in benchmarks Key: FLINK-25556 URL: https://issues.apache.org/jira/browse/FLINK-25556 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Checkpointing Reporter: Anton Kalashnikov After finishing https://issues.apache.org/jira/browse/FLINK-25105, flink always waits for the final checkpoint after all tasks are finished which leads to prolongation of the test and incorrect benchmark results. The idea is to disable `ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH` for all benchmarks. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25454) Negative time in throughput calculator
Anton Kalashnikov created FLINK-25454: - Summary: Negative time in throughput calculator Key: FLINK-25454 URL: https://issues.apache.org/jira/browse/FLINK-25454 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Found during the random test: {noformat} 2021-12-23 11:52:01,645 WARN org.apache.flink.runtime.taskmanager.Task [] - KeyedProcess -> Sink: Unnamed (3/3)#0 (1321490f33c6370f2d68c413a8a0b0c1) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Time should be non negative at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:90) at org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:81) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:414) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:786) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$3(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:801) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:750) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Unknown Source) {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24738) Fail during announcing buffer size to released local channel
Anton Kalashnikov created FLINK-24738: - Summary: Fail during announcing buffer size to released local channel Key: FLINK-24738 URL: https://issues.apache.org/jira/browse/FLINK-24738 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Since we can release all resources not only when the mailbox would be finished but also from the mailbox: {noformat} org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.releaseAllResources(LocalInputChannel.java:331) org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.transformEvent(SingleInputGate.java:808) org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.transformToBufferOrEvent(SingleInputGate.java:757) org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:687) org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:666) org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:142) org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:150) org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:503) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:816) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:768) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:936) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:750) org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) {noformat} It is possible that after it the BufferDebloater announce the new buffer size which will fail because the channel is released already: {noformat} Caused by: java.lang.IllegalStateException: Channel released. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.announceBufferSize(LocalInputChannel.java:354) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.announceBufferSize(SingleInputGate.java:389) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.announceBufferSize(InputGateWithMetrics.java:102) at org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater.recalculateBufferSize(BufferDebloater.java:101) at org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:801) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:791) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:816) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:768) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:936) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:750) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) at java.lang.Thread.run(Thread.java:748) {noformat} So I think that we should replace `checkState` with `if` for LocalInputChannel#announceBufferSize since released channel is expected here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24734) testSecretOption fails due to missing required options
Anton Kalashnikov created FLINK-24734: - Summary: testSecretOption fails due to missing required options Key: FLINK-24734 URL: https://issues.apache.org/jira/browse/FLINK-24734 Project: Flink Issue Type: Bug Affects Versions: 1.13.3 Reporter: Anton Kalashnikov {noformat} java.lang.AssertionError: Expected: (an instance of org.apache.flink.table.api.ValidationException and Expected failure cause is ) but: Expected failure cause is The throwable does not contain the expected failure cause Stacktrace was: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default.default.t1'. Table options are: 'buffer-size'='1000''connector'='test-connector''key.format'='test-format''key.test-format.delimiter'=',''password'='**''property-version'='1''value.format'='test-format''value.test-format.delimiter'='|''value.test-format.fail-on-missing'='true' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:140) at org.apache.flink.table.factories.utils.FactoryMocks.createTableSource(FactoryMocks.java:58) at org.apache.flink.table.factories.FactoryUtilTest.testError(FactoryUtilTest.java:374) at org.apache.flink.table.factories.FactoryUtilTest.testSecretOption(FactoryUtilTest.java:162) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing. Missing required options are: target at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:384) at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:357) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:718) at org.apache.flink.table.factories.TestDynamicTableFactory.createDynamicTableSource(TestDynamicTableFactory.java:80) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) ... 27 more at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.junit.rules.ExpectedException.handleException(ExpectedException.java:252) at org.junit.rules.ExpectedException.access$000(ExpectedException.java:106) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:241) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at
[jira] [Created] (FLINK-24690) Clarification of buffer size threshold calculation in BufferDebloater
Anton Kalashnikov created FLINK-24690: - Summary: Clarification of buffer size threshold calculation in BufferDebloater Key: FLINK-24690 URL: https://issues.apache.org/jira/browse/FLINK-24690 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov It looks like that the variable `skipUpdate` in BufferDebloater#recalculateBufferSize is calculated in a not obvious way. For example if `taskmanager.network.memory.buffer-debloat.threshold-percentages` is set to 50(means 50%) then it will be something like: * 32000 -> 16000(possible) * 32000 -> 17000(not possible) * 16000 -> 24000(not possible) - but 16000 + 50% = 24000 * 16000 -> 32000(only this possible) This happens because the algorithm takes into account only the largest value. So in example of `16000 -> 24000` it would calculate 50% of 24000 so only transit from 12000 -> 24000 possible. In general, this approach is not so bad especially on small values (instead of 256 ->374, the minimum possible transit is 256 -> 512). But we should clarify it somewhere with test or javadoc or both. Also, we can discuss changing this algorithm to a more natural implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24671) Possible NPE in LocalInputChannel#getBuffersInUseCount before initialization of subpartitionView
Anton Kalashnikov created FLINK-24671: - Summary: Possible NPE in LocalInputChannel#getBuffersInUseCount before initialization of subpartitionView Key: FLINK-24671 URL: https://issues.apache.org/jira/browse/FLINK-24671 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov If the buffer debloater tries to get the number of buffers in use before subpartitionView will be initialized (or after released?), NPE will be possible in LocalInputChannel#getBuffersInUseCount -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24659) Two active miniCluster in RemoteBenchmarkBase
Anton Kalashnikov created FLINK-24659: - Summary: Two active miniCluster in RemoteBenchmarkBase Key: FLINK-24659 URL: https://issues.apache.org/jira/browse/FLINK-24659 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov It seems that all children of RemoteBenchmarkBase work incorrectly since they configure the environment for miniCluster from FlinkEnvironmentContext but in reality, they use miniCluster from RemoteBenchmarkBase. So it definitely we should remove one of them. I think we can get rid of RemoteBenchmarkBase#miniCluster and use FlinkEnvironmentContext#miniCluster everywhere. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24658) Debug logs for buffer size calculation
Anton Kalashnikov created FLINK-24658: - Summary: Debug logs for buffer size calculation Key: FLINK-24658 URL: https://issues.apache.org/jira/browse/FLINK-24658 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Since the buffer debloater recalculates buffer size based on several different parameters. It makes sense to add debug logging to print all of them in case of necessary debugging. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24657) Add metric of the total real size of input/output buffers queue
Anton Kalashnikov created FLINK-24657: - Summary: Add metric of the total real size of input/output buffers queue Key: FLINK-24657 URL: https://issues.apache.org/jira/browse/FLINK-24657 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Right now we have the metric of the length of input/output buffers queue but since buffer debloater has been introduced this metric is not always helpful because the real size of each buffer can be different. So it is an idea to add a new metric that shows the total size of buffers in the queue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24589) FLIP-183: Buffer debloating 1.2
Anton Kalashnikov created FLINK-24589: - Summary: FLIP-183: Buffer debloating 1.2 Key: FLINK-24589 URL: https://issues.apache.org/jira/browse/FLINK-24589 Project: Flink Issue Type: New Feature Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24578) Unexpected erratic load shape for channel skew load profile
Anton Kalashnikov created FLINK-24578: - Summary: Unexpected erratic load shape for channel skew load profile Key: FLINK-24578 URL: https://issues.apache.org/jira/browse/FLINK-24578 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Attachments: antiphaseBufferSize.png, erraticBufferSize1.png, erraticBufferSize2.png given: The job with 5 maps(with keyBy). All channels are remote. Parallelism is 80 The first task produces only two keys - `indexOfThisSubtask` and `indexOfThisSubtask + 1`. So every subTask has a constant value of active channels(depends on hash rebalance) Every record has an equal size and is processed for an equal time. when: The buffer debloat is enabled with the default configuration. then: The buffer size synchonizes on every subTask on the first map for some reason. It can have the strong synchronization as shown on the erraticBufferSize1 picture but usually synchronization is less explicit as on erraticBufferSize2. !erraticBufferSize1.png! Expected: After the stabilization period the buffer size should be mostly constant with small fluctuation or the different tasks should be in antiphase to each other(when one subtask has small buffer size the another should have a big buffer size). for example the picture antiphaseBufferSize !antiphaseBufferSize.png! Unfortunatelly, it is not reproduced every time which means that this problem can be connected to environment. But at least, it makes sense to try to understand why we have so strange load shape when only several input channels are active. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24553) Change buffer debloating default configuration values
Anton Kalashnikov created FLINK-24553: - Summary: Change buffer debloating default configuration values Key: FLINK-24553 URL: https://issues.apache.org/jira/browse/FLINK-24553 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov After the investigation for buffer debloating effectiveness, there are some conclusion: * taskmanager.memory.min-segment-size can be decreased from 1024 to at least 256 because in some cases even 1024 is a too big value and at the same time the low min value is not a problem. * taskmanager.network.memory.buffer-debloat.samples can be decreased from 20 to 10 or taskmanager.network.memory.buffer-debloat.period can be decreased from 500ms to 100ms or 200ms. According to the investigation, the current speed of reaction is too slow so it is better to increase it by changing one of these parameters. * taskmanager.network.memory.buffer-debloat.threshold-percentages can be decreased from 50 to 10 because there are no problems were found when the announcement of buffer size happened more frequently but it actually can positively influent the checkpoint time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24551) BUFFER_DEBLOAT_SAMPLES property is taken from the wrong configuration
Anton Kalashnikov created FLINK-24551: - Summary: BUFFER_DEBLOAT_SAMPLES property is taken from the wrong configuration Key: FLINK-24551 URL: https://issues.apache.org/jira/browse/FLINK-24551 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Right now, StreamTask receives the BUFFER_DEBLOAT_SAMPLES property from the taskConfiguration which is wrong. The right place for the debloat configuration is taskManagerConfiguration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24481) Translit buffer debloater documenation to chinese
Anton Kalashnikov created FLINK-24481: - Summary: Translit buffer debloater documenation to chinese Key: FLINK-24481 URL: https://issues.apache.org/jira/browse/FLINK-24481 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Translit buffer debloater documenation to chinese -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24470) EMA for buffer debloat level
Anton Kalashnikov created FLINK-24470: - Summary: EMA for buffer debloat level Key: FLINK-24470 URL: https://issues.apache.org/jira/browse/FLINK-24470 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Right now, we use EMA for the smoothing calculation of the throughput. But there is an idea to use EMA on the buffer debloat(BufferDebloater) level because it contains more information(old buffer size, current throughput, buffer in use count, etc.) and it looks like it should be used more effectively. !NB. This is just an idea which makes sense to try. but if the new implementation doesn't have real profit it makes sense to keep the old one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24469) Incorrect calcualtion of the buffer size in case of channel data skew
Anton Kalashnikov created FLINK-24469: - Summary: Incorrect calcualtion of the buffer size in case of channel data skew Key: FLINK-24469 URL: https://issues.apache.org/jira/browse/FLINK-24469 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov If only a couple out of 100 input channels are active then there is a huge throughput drop is observed. I suppose it happens because of the line inside of SingleInputGate#getBuffersInUseCount: {noformat} Math.max(1, channel.getBuffersInUseCount()); {noformat} Right now, I don't think that it makes sense to have min value as 1, perhaps zero buffer in use is also ok. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24468) NPE when notifyNewBufferSize
Anton Kalashnikov created FLINK-24468: - Summary: NPE when notifyNewBufferSize Key: FLINK-24468 URL: https://issues.apache.org/jira/browse/FLINK-24468 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov In my opinion, we have two problem there: # The exception itself(see below) # Ignoring the exception and stopping rescheduling of the calculation of the buffer size. Of course, we need to fix this NPE and we need to think what we want to do if the buffer debloat fails with error. {noformat} java.lang.NullPointerException: null at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.notifyNewBufferSize(CreditBasedPartitionRequestClientHandler.java:135) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.notifyNewBufferSize(NettyPartitionRequestClient.java:203) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyNewBufferSize(RemoteInputChannel.java:330) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.announceBufferSize(RemoteInputChannel.java:299) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.announceBufferSize(SingleInputGate.java:389) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.announceBufferSize(InputGateWithMetrics.java:102) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater.recalculateBufferSize(BufferDebloater.java:118) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:795) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_282] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:814) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:754) [flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24467) Set min and max buffer size even if the difference less than threshold
Anton Kalashnikov created FLINK-24467: - Summary: Set min and max buffer size even if the difference less than threshold Key: FLINK-24467 URL: https://issues.apache.org/jira/browse/FLINK-24467 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Right now, we apply a new buffer size only if it differs from the old buffer size more than the configured threshold but if the old buffer size is close to the max or min value less than this threshold we are always stuck on this value. For example, if we have the old buffer size 22k and our threshold is 50% then the value which we can apply should 33k but this is impossible because the max value is 32k so once we calculate the buffer size to 22k it is impossible to increase it. The suggestion is to apply the changes every time when we calculate the new value to min or max size and the old value was different. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24294) Resources leak in the StreamTask constructor
Anton Kalashnikov created FLINK-24294: - Summary: Resources leak in the StreamTask constructor Key: FLINK-24294 URL: https://issues.apache.org/jira/browse/FLINK-24294 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Since we are initializing a lot of resources in the StreamTask constructor like RecordWriter, timerServices, etc. it is possible that some of these resources remain open if the exception happens below the initialization in the same constructor. So in my opinion, we have two choices here: * Avoiding allocation of resources in the constructor which allows us to do something like: {noformat} StreamTask task = new StreamTask(); //no leaks if it fails try { task.init(); } finally { task.cleanUp(); } {noformat} * or we can rewrite a code in such a way that exception in any constructor(ex. StreamTask) guarantee releasing the earlier allocated resources in this constructor. But it is not so easy to implement(see. initialization of recordWriter in StreamTask constructor) So perhaps it makes sense to separate creating object from initialization(allocation resources) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24233) Receiving new buffer size before network reader configured
Anton Kalashnikov created FLINK-24233: - Summary: Receiving new buffer size before network reader configured Key: FLINK-24233 URL: https://issues.apache.org/jira/browse/FLINK-24233 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Fix For: 1.14.1 {noformat} 2021-09-09 14:36:42,383 WARN org.apache.flink.runtime.taskmanager.Task [] - Map -> Flat Map (71/75)#0 (7a5b971e0cd57aa5d057a114e2679b03) switched from RUNNING to FAILED with failure c ause: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'ip-172-31-22-183.eu-central-1.compute.internal/172.31.22.183:42085'. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:339) at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:240) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: No reader for receiverId = 296559f497c54a82534945f4549b9e2d exists. at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.obtainReader(PartitionRequestQueue.java:194) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.notifyNewBufferSize(PartitionRequestQueue.java:188) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:134) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311) at
[jira] [Created] (FLINK-24231) Buffer debloating microbenchmark for multiply gate
Anton Kalashnikov created FLINK-24231: - Summary: Buffer debloating microbenchmark for multiply gate Key: FLINK-24231 URL: https://issues.apache.org/jira/browse/FLINK-24231 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Fix For: 1.15.0 It needs to expand the microbenchmark from https://issues.apache.org/jira/browse/FLINK-24230 with a scenario when different gates have: * different throughput * different record size -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24230) Buffer debloating microbenchmark for single gate
Anton Kalashnikov created FLINK-24230: - Summary: Buffer debloating microbenchmark for single gate Key: FLINK-24230 URL: https://issues.apache.org/jira/browse/FLINK-24230 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Fix For: 1.15.0 Currently, there are no microbenchmarks that check buffer debloating effectiveness. The idea is to create one which will measure the checkpoint time. The benchmark should be similar to `UnalignedCheckpointTimeBenchmark` but unlike the `UnalignedCheckpointTimeBenchmark` where we see the effect of `Buffer debloat` only for extremely small values like 1ms for BUFFER_DEBLOAT_TARGET. This benchmark should provide a more reliable way to check the different implementations of `Buffer debloat` it can be reached by increasing at least record size and checkpoint interval. The main target is to have how long will it take to do the checkpoint during backpressure when all buffers are full. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24191) Adjusting number of buffers besides buffer size
Anton Kalashnikov created FLINK-24191: - Summary: Adjusting number of buffers besides buffer size Key: FLINK-24191 URL: https://issues.apache.org/jira/browse/FLINK-24191 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Fix For: 1.15.0 "Buffer debloat" adjusts only the buffer size but it also makes sense to adjust the number of buffers. It is not clear for now what should be adjusted and in which proportions so it needs to think about how to figure this out. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24190) Handling large record with buffer debloat
Anton Kalashnikov created FLINK-24190: - Summary: Handling large record with buffer debloat Key: FLINK-24190 URL: https://issues.apache.org/jira/browse/FLINK-24190 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Fix For: 1.15.0 If the buffer size will be too small(less than record size) due to buffer debloat it can lead to performance degradation. It looks like it is better to keep the buffer size greater than the record size(or even greater than it). So it needs to check how bad it can be and fix it. Implementation should be easy, we can choose the maximum value between desirableBufferSize and recordSize during requesting the new buffer(BufferWritingResultPartition#addToSubpartition) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24189) Debloating buffer for multiply gates
Anton Kalashnikov created FLINK-24189: - Summary: Debloating buffer for multiply gates Key: FLINK-24189 URL: https://issues.apache.org/jira/browse/FLINK-24189 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Anton Kalashnikov Fix For: 1.15.0 Right now, the buffer debloat suppose that works with SingleInputGates which have a similar load. It needs to support UnionInputGate in case of data skew. The possible implementation can be the calculation of the throughput separately for each gate or calculation of the total throughput but choosing the buffer size independently for each gate based on their buffers in use. It is highly important to fairly share the throughput among all gates. In other words, avoid the situation: * gate1 has a low load while gate2 has a high load * the small buffer size was set for gate1 and the big buffer size for gate2 * the load for gate1 increased up to the load of gate2 * it is impossible to increase the buffer size for gate1 because it is no reason to decrease the buffer size for gate2 since the load for it doesn't change -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23975) High checkpoint time for unaligned checkpoint when throthling in source
Anton Kalashnikov created FLINK-23975: - Summary: High checkpoint time for unaligned checkpoint when throthling in source Key: FLINK-23975 URL: https://issues.apache.org/jira/browse/FLINK-23975 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov According to a brief test, if we add a delay in the source before emitting(outside of the checkpoint lock synchronization) it leads to increasing unaligned checkpoint time(in my case from 1sec to 15sec). Approximate logic for throttling in source: {noformat} @Override public void run(SourceContext sourceContext) throws Exception { while (running) { LockSupport.parkNanos(sleepNanos); synchronized (sourceContext.getCheckpointLock()) { sourceContext.collect(new Type(ThreadLocalRandom.current().nextLong(), time, payload)); } } } {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23974) Low throughput for buffer debloat in case of different load profile
Anton Kalashnikov created FLINK-23974: - Summary: Low throughput for buffer debloat in case of different load profile Key: FLINK-23974 URL: https://issues.apache.org/jira/browse/FLINK-23974 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov According to task https://issues.apache.org/jira/browse/FLINK-23456, we have some performance drop when the buffer debloat is enabled: * for static load with throttling in source - 1-3% * for sinus shape load with throttling in source - 2-3% * for erratic load with throttling in source - 5-6% It needs to investigate the reason for that and try to improve if it is possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23973) FLIP-183: Buffer debloat 1.1
Anton Kalashnikov created FLINK-23973: - Summary: FLIP-183: Buffer debloat 1.1 Key: FLINK-23973 URL: https://issues.apache.org/jira/browse/FLINK-23973 Project: Flink Issue Type: New Feature Reporter: Anton Kalashnikov Second umbrella ticket for [https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment] This ticket should collect task for improvement the first version of the buffer debloat feature -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23928) Test the buffer debloating
Anton Kalashnikov created FLINK-23928: - Summary: Test the buffer debloating Key: FLINK-23928 URL: https://issues.apache.org/jira/browse/FLINK-23928 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov # {color:#6a8759}Configure the buffer debloat by setting `taskmanager.network.memory.buffer-debloat.enabled` to true{color} # {color:#6a8759}Start the job with a long checkpoint time{color} # {color:#6a8759}Check the alignment time for each subtask it should be near taskmanager.network.memory.buffer-debloat.target{color} # {color:#6a8759}Check the checkpoint time it should be near taskCount * taskmanager.network.memory.buffer-debloat.target{color} # {color:#6a8759}Try to change taskmanager.network.memory.buffer-debloat.target. Checkponint time should be changed correspondetly.{color} # {color:#6a8759}Check metrics {color}{color:#6a8759}estimatedTimeToConsumerBuffersMs and {color}{color:#6a8759}debloatedBufferSize which also should depend on the current debloat configuration. {color} {color:#6a8759} {color} {color:#6a8759}Other configuration which can be changed for test.{color} * {color:#6a8759}taskmanager.memory.min-segment-size(if this would be equal to taskmanager.memory.segment-size then the behaviour should be same as with disabled buffer debloat) {color} * {color:#6a8759}taskmanager.network.memory.buffer-debloat.period{color} * {color:#6a8759}taskmanager.network.memory.buffer-debloat.samples{color} * {color:#6a8759}taskmanager.network.memory.buffer-debloat.threshold-percentages{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23870) Test the ability for ignoring in-flight data on recovery
Anton Kalashnikov created FLINK-23870: - Summary: Test the ability for ignoring in-flight data on recovery Key: FLINK-23870 URL: https://issues.apache.org/jira/browse/FLINK-23870 Project: Flink Issue Type: Technical Debt Reporter: Anton Kalashnikov Test for https://issues.apache.org/jira/browse/FLINK-22684: # configure the externalize checkpoint for unaligned checkpoint # wait for the checkpoint then stop the job # restore from the last checkpoint - it should be restored on the last state(optional step) # set the `execution.checkpointing.recover-without-channel-state.checkpoint-id` to the last checkpoint # restore from the last checkpoint - in-flight data would be ignored so it would be restored a little less data than expected in the usual case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23726) Buffer debloat configuration is taken from the wrong configuration
Anton Kalashnikov created FLINK-23726: - Summary: Buffer debloat configuration is taken from the wrong configuration Key: FLINK-23726 URL: https://issues.apache.org/jira/browse/FLINK-23726 Project: Flink Issue Type: Bug Components: Runtime / Task Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Right now, StreamTask receives the buffer debloat configuration from the taskConfiguration which is wrong. The right place for the debloat configuration is taskManagerConfiguration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23459) New metrics for dynamic buffer size
Anton Kalashnikov created FLINK-23459: - Summary: New metrics for dynamic buffer size Key: FLINK-23459 URL: https://issues.apache.org/jira/browse/FLINK-23459 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov The following metrics can be added: * The total size of buffered in-flight data (both detailed per gate + sum per subtask) * Total estimated time to process the data (both detailed per gate + max per subtask) * Actual value of the calculated dynamic buffer size (only detailed per gate) All of those metrics should be exposed only on the input side, taking into account only input in-flight data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23458) Document required number of buffers in the network stack
Anton Kalashnikov created FLINK-23458: - Summary: Document required number of buffers in the network stack Key: FLINK-23458 URL: https://issues.apache.org/jira/browse/FLINK-23458 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov Currently it seems that it's not mentioned anywhere what's the number of required number of buffers. And that this number is different than the configured number of exclusive/floating buffers. It should be mentioned in that both {{taskmanager.network.memory.floating-buffers-per-gate}} and {{taskmanager.network.memory.buffers-per-channel}} are best effort and not guaranteed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23457) Sending the buffer of the right size for broadcast
Anton Kalashnikov created FLINK-23457: - Summary: Sending the buffer of the right size for broadcast Key: FLINK-23457 URL: https://issues.apache.org/jira/browse/FLINK-23457 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov It is not enough to know just the number of available buffers (credits) for the downstream because the size of these buffers can be different. So we are proposing to resolve this problem in the following way: If the downstream buffer size is changed then the upstream should send the buffer of the size not greater than the new one regardless of how big the current buffer on the upstream. (pollBuffer should receive parameters like bufferSize and return buffer not greater than it) Downstream will be able to support any buffer size < max buffer size, so it should be just good enough to request BufferBuilder with new size after getting announcement, and leaving existing BufferBuilder/BufferConsumers unchanged. In other words code in {{PipelinedSubpartition(View)}} doesn’t need to be changed (apart of forwarding new buffer size to the {{BufferWritingResultPartition}}). All buffer size adjustments can be implemented exclusively in {{BufferWritingResultPartition}}. If different downstream subtasks have different throughput and hence different desired buffer sizes, then a single upstream subtask has to support having two different subpartitions with different buffer sizes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23456) Manually test on cluster
Anton Kalashnikov created FLINK-23456: - Summary: Manually test on cluster Key: FLINK-23456 URL: https://issues.apache.org/jira/browse/FLINK-23456 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov Test different jobs: * a job with static throughput *(must work for successful MVP)* * a job with gracefully varying load (sinus shape load?) *(should work for successful MVP)* * a job with erratic load (emulating evil window operator firing) *(would be nice to work for MVP)* Test on cluster all jobs and check if/how buffer size adjustment is not erratic. Verify our final goals: * that aligned checkpoint time in back pressured jobs is reduced to sane values (roughly {{numberOfExchanges * configuredBufferedDataTime * 2}}) * check if our changes affected the throughput or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23454) Sending the buffer of the right size for unicast
Anton Kalashnikov created FLINK-23454: - Summary: Sending the buffer of the right size for unicast Key: FLINK-23454 URL: https://issues.apache.org/jira/browse/FLINK-23454 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov It is not enough to know just the number of available buffers (credits) for the downstream because the size of these buffers can be different. So we are proposing to resolve this problem in the following way: If the downstream buffer size is changed then the upstream should send the buffer of the size not greater than the new one regardless of how big the current buffer on the upstream. (pollBuffer should receive parameters like bufferSize and return buffer not greater than it) Downstream will be able to support any buffer size < max buffer size, so it should be just good enough to request BufferBuilder with new size after getting announcement, and leaving existing BufferBuilder/BufferConsumers unchanged. In other words code in {{PipelinedSubpartition(View)}} doesn’t need to be changed (apart of forwarding new buffer size to the {{BufferWritingResultPartition}}). All buffer size adjustments can be implemented exclusively in {{BufferWritingResultPartition}}. If different downstream subtasks have different throughput and hence different desired buffer sizes, then a single upstream subtask has to support having two different subpartitions with different buffer sizes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23453) Dynamic calculation of the buffer size
Anton Kalashnikov created FLINK-23453: - Summary: Dynamic calculation of the buffer size Key: FLINK-23453 URL: https://issues.apache.org/jira/browse/FLINK-23453 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov To calculate the desired buffer size we need to take into account the throughput, configuration(timeInInputBuffer), and the actual number of buffers in use. It makes sense to use EMA for this calculation to smoothen out intermittent spikes. The calculation based on the actual number of buffers in use helps to avoid problems with the data skew (when only a couple of channels out of thousands have any data). So the solution needs to reliably and efficiently calculate either the estimated or an average number of buffers in use. Buffer size can be erratic if it’s not trivial to make it stable in the MVP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23452) Measuring subtask throughput
Anton Kalashnikov created FLINK-23452: - Summary: Measuring subtask throughput Key: FLINK-23452 URL: https://issues.apache.org/jira/browse/FLINK-23452 Project: Flink Issue Type: Sub-task Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov In the first implementation, throughput could be measured for the whole subtask. The throughput calculation should take into account the numbers of bytes that were handled, the backpressure time and ignore the idle time. The main idea is to keep the balance between idle and backpressure time, so if the backpressure time is high we should decrease the buffer size to provide the configured handling time and vice versa if the subtask is idle time that period should be ignored from calculating the throughput. Otherwise, in the case of network bottleneck, we might have ended up with a small buffer size that’s causing the bottleneck in the first place but we are not able to increase it due to idle time reducing throughput and lowering the buffer size. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23451) FLIP-183: Dynamic buffer size adjustment
Anton Kalashnikov created FLINK-23451: - Summary: FLIP-183: Dynamic buffer size adjustment Key: FLINK-23451 URL: https://issues.apache.org/jira/browse/FLINK-23451 Project: Flink Issue Type: New Feature Reporter: Anton Kalashnikov Umbrella ticket for https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23041) Change local alignment timeout back to the global time out
Anton Kalashnikov created FLINK-23041: - Summary: Change local alignment timeout back to the global time out Key: FLINK-23041 URL: https://issues.apache.org/jira/browse/FLINK-23041 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Local alignment timeouts are very confusing and especially without timeout on the outputs, they can significantly delay timeouting to UC. Problematic case is when all CBs are received with long delay because of the back pressure, but they arrive at the same time. Alignment time can be low (milliseconds), while start delay is ~1 minute. In that case checkpoint doesn't timeout to UC and is passing the responsibility to timeout down the stream. So it is not so transparant for the user why and when AC switches to UC. As mentioned before, the start delay is not correlated with the alignment timeout because it doesn't take into account time in output buffer. the alignment time is not fully correlated with the alignment timeout because the alignment time doesn't take into account the barrier announcement. Based on this, there is the proposal to change the semantic of alignmentTimeout configuration to such meaning: *The time between the starting of checkpoint(on the checkpont coordinator) and the time when the checkpoint barrier will be received by task.* By this definition, we will have kind of global timeout which says that if the AC isn't finished for alignmentTimeout time it will be switched to UC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23034) NPE in JobDetailsDeserializer during the reading old version of ExecutionState
Anton Kalashnikov created FLINK-23034: - Summary: NPE in JobDetailsDeserializer during the reading old version of ExecutionState Key: FLINK-23034 URL: https://issues.apache.org/jira/browse/FLINK-23034 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov There is no compatibility for ExecutionState: {noformat} java.lang.NullPointerExceptionjava.lang.NullPointerException at org.apache.flink.runtime.messages.webmonitor.JobDetails$JobDetailsDeserializer.deserialize(JobDetails.java:308) at org.apache.flink.runtime.messages.webmonitor.JobDetails$JobDetailsDeserializer.deserialize(JobDetails.java:278) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3479) at org.apache.flink.runtime.messages.webmonitor.JobDetailsTest.testJobDetailsCompatibleUnmarshalling(JobDetailsTest.java:82) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22961) Incorrect calculation of alignment timeout for LocalInputChannel
Anton Kalashnikov created FLINK-22961: - Summary: Incorrect calculation of alignment timeout for LocalInputChannel Key: FLINK-22961 URL: https://issues.apache.org/jira/browse/FLINK-22961 Project: Flink Issue Type: Bug Reporter: Anton Kalashnikov Assignee: Anton Kalashnikov Right now, the calculation of alignment timeout happens inside of SingleCheckpointBarrierHandler(org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.ControllerImpl#isTimedOut) and it based on firstBarrierArrivalTime. the firstBarrierArrivalTime recalculated only when barrier announcement was received but if we receive the first checkpoint barrier from the LocalInputChannel which doesn't support announcement, the calculation of alignment timeout will be based on the firstBarrierArrivalTime from the previous checkpoint which is wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22684) Add the ability to ignore in-flight data on recovery
Anton Kalashnikov created FLINK-22684: - Summary: Add the ability to ignore in-flight data on recovery Key: FLINK-22684 URL: https://issues.apache.org/jira/browse/FLINK-22684 Project: Flink Issue Type: Improvement Reporter: Anton Kalashnikov The main case: * We want to restore the last unaligned checkpoint. * In-flight data of this checkpoint is corrupted. * We want to ignore this corrupted data and restore only states. The idea is having new configuration parameter('ignoreInFlightDataO Recovery' or similar). and If it set to true, ignore the metadata of in-flight data on the Checkpoint Coordinator side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22379) Introduce a new JobStatus to avoid premature checkpoint triggering
Anton Kalashnikov created FLINK-22379: - Summary: Introduce a new JobStatus to avoid premature checkpoint triggering Key: FLINK-22379 URL: https://issues.apache.org/jira/browse/FLINK-22379 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Anton Kalashnikov Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator to trigger checkpoint which is ok. But unfortunately, JobStatus switches to RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this leads to several problems, one of them you can see in the log: {noformat} WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Failed to trigger checkpoint for job bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7) of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_272]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat} To avoid this problem, it is a good idea to introduce new JobStatus between CREATED and RUNNING(RESTORING?). And then: * JobStatus CREATED switches to RESTORING at the same time when right now CREATED switches to RUNNING * JobStatus RESTORING switches to RUNNING when all tasks switched their states from INITIALIZING to RUNNING It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order to have the same name for job and task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22215) Rename RECOVERING state to RESTORING
Anton Kalashnikov created FLINK-22215: - Summary: Rename RECOVERING state to RESTORING Key: FLINK-22215 URL: https://issues.apache.org/jira/browse/FLINK-22215 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Anton Kalashnikov Task https://issues.apache.org/jira/browse/FLINK-17012 introduced new ExectuionState - RECOVERING which restore data on the last checkpoint. There are thoughts that RECOVERING is not a suitable name for this case and it is much better to rename it to RESTORING -- This message was sent by Atlassian Jira (v8.3.4#803005)