[jira] [Created] (FLINK-31610) Refactoring of LocalBufferPool

2023-03-24 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-31610:
-

 Summary: Refactoring of LocalBufferPool
 Key: FLINK-31610
 URL: https://issues.apache.org/jira/browse/FLINK-31610
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.17.0
Reporter: Anton Kalashnikov


FLINK-31293 bug highlighted the issue with the internal mutual consistency of 
different fields in LocalBufferPool. ex.:
-  `numberOfRequestedOverdraftMemorySegments`
-  `numberOfRequestedMemorySegments`
-  `availableMemorySegment`
-  `currentPoolSize`

Most of the problem was fixed already(I hope) but it is a good idea to 
reorganize the code in such a way that all invariants between all fields inside 
will be clearly determined and difficult to break.

As one example I can propose getting rid of 
numberOfRequestedOverdraftMemorySegments and using existing 
numberOfRequestedMemorySegments instead. That means:

- the pool will be available when `!availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0`
- we don't request a new `ordinary` buffer when 
`numberOfRequestedMemorySegments >=  currentPoolSize` but we request the 
overdraft buffer instead
- `setNumBuffers` should work automatically without any changes

I think we can come up with a couple of such improvements to simplify the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30278) Unexpected config mutation in SinkTransformationTranslator

2022-12-02 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-30278:
-

 Summary: Unexpected config mutation in 
SinkTransformationTranslator 
 Key: FLINK-30278
 URL: https://issues.apache.org/jira/browse/FLINK-30278
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.16.0, 1.17.0
Reporter: Anton Kalashnikov


If we forbid changing configuration 
programmatically(`execution.program-config.enabled`) and try to use `FileSink`. 
The following exception will occur:

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not allowed configuration change(s) were detected:
 - Configuration parallelism.default:1 not allowed in the configuration object 
ExecutionConfig.
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
   at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
   at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
   at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
   at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
   at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
   at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
   at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
   at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
   at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
   at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not 
allowed configuration change(s) were detected:
 - Configuration parallelism.default:1 not allowed in the configuration object 
ExecutionConfig.
   at 
org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
   at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
   at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
   at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
Source)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
   at java.base/java.lang.reflect.Method.invoke(Unknown Source)
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
   ... 16 more


It happens since inside of `SinkTransformationTranslator` we have following 
logic:
* Remeber the current parallelism
* Set parallelism to default
* Do transformation
* Set parallelism to remembered one

But if the initial prallelism is default we actually should do nothing but 
according current logic we explicitly set default value to the configuration 
which actually is the programmatic config mutation(which we want to avoid)

See 
org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28468) Creating benchmark channel in Apache Flink slack

2022-07-08 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-28468:
-

 Summary: Creating benchmark channel in Apache Flink slack
 Key: FLINK-28468
 URL: https://issues.apache.org/jira/browse/FLINK-28468
 Project: Flink
  Issue Type: Improvement
Reporter: Anton Kalashnikov


It needs to create slack channel to which the result of benchmarks will be sent.
Steps:
* Create `dev-benchmarks` channel(with the help of admin of Apache Flink slack 
needed)
* Create `Slack app` for sending benchmark results to slack(admin help as well).
* Configure existing Jenkins with the new `Slack app`'s token to send the 
result of benchmarks to this new channel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28407) Make Jenkins benchmark script more autonomous

2022-07-05 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-28407:
-

 Summary: Make Jenkins benchmark script more autonomous
 Key: FLINK-28407
 URL: https://issues.apache.org/jira/browse/FLINK-28407
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Anton Kalashnikov


Right now, we have Jenkins scripts in the benchmarks repository which contains 
all logic for launching benchmarks and sending data to Codespeed. But 
unfortunately, it is not clear to external observers how to use that in case of 
current Jenkins will be lost.
The suggestion is:
* add information to Readme file about Jenkins and Codespeed.
* add default values to the script rather than keeping them in only Jenkins.

After this task, anybody should be able easily to configure these benchmarks on 
their own Jenkins + Codespeed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28243) Considering only time of targeted operation for calculation benchmarks performance

2022-06-24 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-28243:
-

 Summary: Considering only time of targeted operation for 
calculation benchmarks performance
 Key: FLINK-28243
 URL: https://issues.apache.org/jira/browse/FLINK-28243
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Anton Kalashnikov


Right now, in many benchmarks, the iteration time consists of many parts which 
are not always connected to the target of the particular benchmark. For 
example, if we want to benchmark async operator, we want to take into account 
only the time which was spent on this operator but currently, the time of one 
iteration of asyncOperatorBenchmark is `job/tasks initialization phase` + 
`operator execution` + `shutdown phase`.

Ideally, we want to isolate somehow `operator execution` time and consider only 
it in this benchmark but `job/tasks initialization phase` should be benchmarked 
separately.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28241) Massive regression on 20.06.2021

2022-06-24 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-28241:
-

 Summary: Massive regression on 20.06.2021
 Key: FLINK-28241
 URL: https://issues.apache.org/jira/browse/FLINK-28241
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov


In FLINK-27921 added property `REQUIREMENTS_CHECK_DELAY` that optimizes the 
number of checks of resources but it also leads to a little delay during the 
init phase which is totally not critical for the normal scenario. But 
unfortunately, our benchmarks can not exclude the time of the initial phase(or 
shutdown phase) from the total time of benchmarks. As result, all benchmarks, 
that have a relatively short execution time(compare to the initial phase time), 
have regression(up to 50%). As a quick solution, it makes sense just to set 
`REQUIREMENTS_CHECK_DELAY` to zero for all benchmarks.
Not full list of affected benchmarks:
 * asyncWait
 * globalWindow
 * mapSink
 * readFileSplit
 * remoteRebalance
 * serializer*
 * sorted*
 * twoInputOneIdleMapSink



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-26326) FLIP-203: Support native and incremental savepoints 1.1

2022-02-23 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-26326:
-

 Summary: FLIP-203: Support native and incremental savepoints 1.1
 Key: FLINK-26326
 URL: https://issues.apache.org/jira/browse/FLINK-26326
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing
Reporter: Anton Kalashnikov
 Fix For: 1.16.0


This is the second part of the implementation of 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints]

The first one: FLINK-25276

 

Motivation. Currently with non incremental canonical format savepoints, with 
very large state, both taking and recovery from savepoints can take very long 
time. Providing options to take native format and incremental savepoint would 
alleviate this problem.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26325) Add test coverage for native format State Processor API

2022-02-23 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-26325:
-

 Summary: Add test coverage for native format State Processor API
 Key: FLINK-26325
 URL: https://issues.apache.org/jira/browse/FLINK-26325
 Project: Flink
  Issue Type: Sub-task
  Components: API / State Processor
Reporter: Anton Kalashnikov


https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints

Check test coverage for:
 * State Processor API reading part

 

Right now we have tests that cover only reading from CANONICAL savepoint but it 
makes sense to add the same tests for NATIVE savepoint and aligned checkpoint



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26134) Document checkpoint/savepoint guarantees

2022-02-14 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-26134:
-

 Summary: Document checkpoint/savepoint guarantees
 Key: FLINK-26134
 URL: https://issues.apache.org/jira/browse/FLINK-26134
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Document checkpoint/savepoint guarantees according to table from 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Proposal



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26130) Document why and when user would like to increase network buffer size

2022-02-14 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-26130:
-

 Summary: Document why and when user would like to increase network 
buffer size
 Key: FLINK-26130
 URL: https://issues.apache.org/jira/browse/FLINK-26130
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Expand 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-number-of-in-flight-buffers
 with more details.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26002) Add test coverage for native format job upgrades

2022-02-08 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-26002:
-

 Summary: Add test coverage for native format job upgrades
 Key: FLINK-26002
 URL: https://issues.apache.org/jira/browse/FLINK-26002
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov
 Fix For: 1.15.0


# Initialization of job with an operator with states. 
# Do native savepoint/canonical savepoint/aligned checkpoint 
## Change job shape(new custom operator) and/or record types
## One job, with all possible network exchanges (forward, keyBy, rebalance, 
broadcast, random, rescale), followed by stateful operator.
## No need to modify state schema. Just validate in some way that after upgrade 
state is assigned to correct operators (so the state should be affecting result 
of the operators/functions)
## Some record type that would allow us to validate consistency of the 
computations
## Validating sink, checking for the consistency
## Arbitrary job upgrade. Add two new operators. A chained mapping operator and 
a another keyed proceeded by keyed exchange . Change record type from int to 
string, without changing the “logical” value of the record (for example change 
1 → "1")
## Job upgrade w/o changing the graph record type: no need to test for that
# Restore from savepoint/checkpoint



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25891) NoClassDefFoundError AsyncSSLPrivateKeyMethod in benchmark

2022-01-31 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25891:
-

 Summary: NoClassDefFoundError AsyncSSLPrivateKeyMethod in benchmark
 Key: FLINK-25891
 URL: https://issues.apache.org/jira/browse/FLINK-25891
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.15.0
Reporter: Anton Kalashnikov






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25650) Document unaligned checkpoints performance limitations (larger records/flat map/timers/...)

2022-01-13 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25650:
-

 Summary: Document unaligned checkpoints performance limitations 
(larger records/flat map/timers/...)
 Key: FLINK-25650
 URL: https://issues.apache.org/jira/browse/FLINK-25650
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


The unaligned checkpoint can be delayed if the current record is consumed too 
long(because it is too large or it is the flat map etc.). Which can be pretty 
confused. So it makes sense to document this limitation to give the user 
understanding of this situation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25646) Document buffer debloating issues with high parallelism

2022-01-13 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25646:
-

 Summary: Document buffer debloating issues with high parallelism
 Key: FLINK-25646
 URL: https://issues.apache.org/jira/browse/FLINK-25646
 Project: Flink
  Issue Type: Improvement
Reporter: Anton Kalashnikov


According to last benchmarks, there are some problems with buffer debloat when 
job has high parallelism. The high parallelism means the different value from 
job to job but in general it is more than 200. So it makes sense to document 
that problem and propose the solution - increasing the number of buffers.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25590) Logging warning of insufficient memory for all configured buffers

2022-01-10 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25590:
-

 Summary: Logging warning of insufficient memory for all configured 
buffers
 Key: FLINK-25590
 URL: https://issues.apache.org/jira/browse/FLINK-25590
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.15.0
Reporter: Anton Kalashnikov


Right now, if exclusive buffers for the input channel and one buffer for each 
subpartition would be allocated on start successfully but there would be not 
enough memory for the rest of the buffers(floating buffers, rest buffers for 
subpartitions), then we see nothing in the log about that(as I understand).
So first of all, we need to check what logs we have right now about situations 
when flink doesn't have enough memory for all configured buffers. And if we 
have nothing (or not enough) we should add such a log.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25556) Extra waiting of final checkpoint in benchmarks

2022-01-06 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25556:
-

 Summary: Extra waiting of final checkpoint in benchmarks
 Key: FLINK-25556
 URL: https://issues.apache.org/jira/browse/FLINK-25556
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Checkpointing
Reporter: Anton Kalashnikov


After finishing https://issues.apache.org/jira/browse/FLINK-25105, flink always 
waits for the final checkpoint after all tasks are finished which leads to 
prolongation of the test and incorrect benchmark results. The idea is to 
disable `ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH` for all benchmarks.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25454) Negative time in throughput calculator

2021-12-27 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-25454:
-

 Summary: Negative time in throughput calculator
 Key: FLINK-25454
 URL: https://issues.apache.org/jira/browse/FLINK-25454
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Found during the random test:

{noformat}
2021-12-23 11:52:01,645 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - KeyedProcess -> Sink: Unnamed (3/3)#0 
(1321490f33c6370f2d68c413a8a0b0c1) switched from RUNNING to FAILED with failure 
cause: java.lang.IllegalArgumentException: Time should be non negative
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:90)
at 
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:81)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:414)
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:786)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$3(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:801)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:750)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Unknown Source)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24738) Fail during announcing buffer size to released local channel

2021-11-02 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24738:
-

 Summary: Fail during announcing buffer size to released local 
channel
 Key: FLINK-24738
 URL: https://issues.apache.org/jira/browse/FLINK-24738
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Since we can release all resources not only when the mailbox would be finished 
but also from the mailbox:
{noformat}
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.releaseAllResources(LocalInputChannel.java:331)
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.transformEvent(SingleInputGate.java:808)
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.transformToBufferOrEvent(SingleInputGate.java:757)
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:687)
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:666)
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:142)
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:150)
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:503)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:816)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:768)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:936)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:750)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
{noformat}
It is possible that after it the BufferDebloater announce the new buffer size 
which will fail because the channel is released already:
{noformat}
Caused by: java.lang.IllegalStateException: Channel released.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.announceBufferSize(LocalInputChannel.java:354)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.announceBufferSize(SingleInputGate.java:389)
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.announceBufferSize(InputGateWithMetrics.java:102)
at 
org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater.recalculateBufferSize(BufferDebloater.java:101)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:801)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:791)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:816)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:768)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:936)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:750)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.lang.Thread.run(Thread.java:748)
{noformat}
 So I think that we should replace `checkState` with `if` for 
LocalInputChannel#announceBufferSize since released channel is expected here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24734) testSecretOption fails due to missing required options

2021-11-02 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24734:
-

 Summary: testSecretOption fails due to missing required options
 Key: FLINK-24734
 URL: https://issues.apache.org/jira/browse/FLINK-24734
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.3
Reporter: Anton Kalashnikov


{noformat}
java.lang.AssertionError: Expected: (an instance of 
org.apache.flink.table.api.ValidationException and Expected failure cause is 
)
     but: Expected failure cause is 

 The throwable 
 does not contain the expected failure cause 
Stacktrace
 was: org.apache.flink.table.api.ValidationException: Unable to create a source 
for reading table 'default.default.t1'.
Table options are:
'buffer-size'='1000''connector'='test-connector''key.format'='test-format''key.test-format.delimiter'=',''password'='**''property-version'='1''value.format'='test-format''value.test-format.delimiter'='|''value.test-format.fail-on-missing'='true'
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:140)
 at 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource(FactoryMocks.java:58)
 at 
org.apache.flink.table.factories.FactoryUtilTest.testError(FactoryUtilTest.java:374)
 at 
org.apache.flink.table.factories.FactoryUtilTest.testSecretOption(FactoryUtilTest.java:162)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
 at org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
 at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
 at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Caused by: 
org.apache.flink.table.api.ValidationException: One or more required options 
are missing.
Missing required options are:
target at 
org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:384)
 at 
org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:357)
 at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:718)
 at 
org.apache.flink.table.factories.TestDynamicTableFactory.createDynamicTableSource(TestDynamicTableFactory.java:80)
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
 ... 27 more

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.junit.Assert.assertThat(Assert.java:956) at 
org.junit.Assert.assertThat(Assert.java:923) at 
org.junit.rules.ExpectedException.handleException(ExpectedException.java:252) 
at org.junit.rules.ExpectedException.access$000(ExpectedException.java:106) at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:241)
 at org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 

[jira] [Created] (FLINK-24690) Clarification of buffer size threshold calculation in BufferDebloater

2021-10-29 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24690:
-

 Summary: Clarification of buffer size threshold calculation in 
BufferDebloater 
 Key: FLINK-24690
 URL: https://issues.apache.org/jira/browse/FLINK-24690
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


It looks like that the variable `skipUpdate` in 
BufferDebloater#recalculateBufferSize is calculated in a not obvious way.

For example if 
`taskmanager.network.memory.buffer-debloat.threshold-percentages` is set to 
50(means 50%) then it will be something like:
 * 32000 -> 16000(possible)
 * 32000 -> 17000(not possible)
 * 16000 -> 24000(not possible) - but 16000 + 50%  = 24000
 * 16000 -> 32000(only this possible)

This happens because the algorithm takes into account only the largest value. 
So in example of `16000 -> 24000` it would calculate 50% of 24000 so only 
transit from 12000 -> 24000 possible. 

In general, this approach is not so bad especially on small values (instead of 
256  ->374, the minimum possible transit is 256 -> 512). But we should clarify 
it somewhere with test or javadoc or both. Also, we can discuss changing this 
algorithm to a more natural implementation.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24671) Possible NPE in LocalInputChannel#getBuffersInUseCount before initialization of subpartitionView

2021-10-27 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24671:
-

 Summary: Possible NPE in LocalInputChannel#getBuffersInUseCount 
before initialization of subpartitionView 
 Key: FLINK-24671
 URL: https://issues.apache.org/jira/browse/FLINK-24671
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


If the buffer debloater tries to get the number of buffers in use before

subpartitionView will be initialized (or after released?), NPE will be possible 
in LocalInputChannel#getBuffersInUseCount



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24659) Two active miniCluster in RemoteBenchmarkBase

2021-10-26 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24659:
-

 Summary: Two active miniCluster in RemoteBenchmarkBase
 Key: FLINK-24659
 URL: https://issues.apache.org/jira/browse/FLINK-24659
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


It seems that all children of RemoteBenchmarkBase work incorrectly since they 
configure the environment for miniCluster from FlinkEnvironmentContext but in 
reality, they use miniCluster from RemoteBenchmarkBase. So it definitely we 
should remove one of them.

I think we can get rid of RemoteBenchmarkBase#miniCluster and use 
FlinkEnvironmentContext#miniCluster everywhere.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24658) Debug logs for buffer size calculation

2021-10-26 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24658:
-

 Summary: Debug logs for buffer size calculation
 Key: FLINK-24658
 URL: https://issues.apache.org/jira/browse/FLINK-24658
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Since the buffer debloater recalculates buffer size based on several different 
parameters. It makes sense to add debug logging to print all of them in case of 
necessary debugging.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24657) Add metric of the total real size of input/output buffers queue

2021-10-26 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24657:
-

 Summary: Add metric of the total real size of input/output buffers 
queue 
 Key: FLINK-24657
 URL: https://issues.apache.org/jira/browse/FLINK-24657
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Right now we have the metric of the length of input/output buffers queue but 
since buffer debloater has been introduced this metric is not always helpful 
because the real size of each buffer can be different. So it is an idea to add 
a new metric that shows the total size of buffers in the queue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24589) FLIP-183: Buffer debloating 1.2

2021-10-19 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24589:
-

 Summary: FLIP-183: Buffer debloating 1.2
 Key: FLINK-24589
 URL: https://issues.apache.org/jira/browse/FLINK-24589
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24578) Unexpected erratic load shape for channel skew load profile

2021-10-18 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24578:
-

 Summary: Unexpected erratic load shape for channel skew load 
profile
 Key: FLINK-24578
 URL: https://issues.apache.org/jira/browse/FLINK-24578
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Attachments: antiphaseBufferSize.png, erraticBufferSize1.png, 
erraticBufferSize2.png

given:

The job with 5 maps(with keyBy).

All channels are remote. Parallelism is 80

The first task produces only two keys - `indexOfThisSubtask` and 
`indexOfThisSubtask + 1`. So every subTask has a constant value of active 
channels(depends on hash rebalance)

Every record has an equal size and is processed for an equal time.

 

when: 

The buffer debloat is enabled with the default configuration.

 

then:

The buffer size synchonizes on every subTask on the first map for some reason. 
It can have the strong synchronization as shown on the erraticBufferSize1 
picture but usually synchronization is less explicit as on erraticBufferSize2.

!erraticBufferSize1.png!

 

Expected:

After the stabilization period the buffer size should be mostly constant with 
small fluctuation or the different tasks should be in antiphase to each 
other(when one subtask has small buffer size the another should have a big 
buffer size). for example the picture antiphaseBufferSize

!antiphaseBufferSize.png!

 

Unfortunatelly, it is not reproduced every time which means that this problem 
can be connected to environment. But at least, it makes sense to try to 
understand why we have so strange load shape when only several input channels 
are active.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24553) Change buffer debloating default configuration values

2021-10-14 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24553:
-

 Summary: Change buffer debloating default configuration values
 Key: FLINK-24553
 URL: https://issues.apache.org/jira/browse/FLINK-24553
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


After the investigation for buffer debloating effectiveness, there are some 
conclusion:
 * taskmanager.memory.min-segment-size can be decreased from 1024 to at least 
256 because in some cases even 1024 is a too big value and at the same time the 
low min value is not a problem.
 * taskmanager.network.memory.buffer-debloat.samples can be decreased from 20 
to 10 or taskmanager.network.memory.buffer-debloat.period can be decreased from 
500ms to 100ms or 200ms. According to the investigation, the current speed of 
reaction is too slow so it is better to increase it by changing one of these 
parameters.
 * taskmanager.network.memory.buffer-debloat.threshold-percentages can be 
decreased from 50 to 10 because there are no problems were found when the 
announcement of buffer size happened more frequently but it actually can 
positively influent the checkpoint time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24551) BUFFER_DEBLOAT_SAMPLES property is taken from the wrong configuration

2021-10-14 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24551:
-

 Summary: BUFFER_DEBLOAT_SAMPLES property is taken from the wrong 
configuration
 Key: FLINK-24551
 URL: https://issues.apache.org/jira/browse/FLINK-24551
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Right now, StreamTask receives the BUFFER_DEBLOAT_SAMPLES property from the 
taskConfiguration which is wrong. The right place for the debloat configuration 
is taskManagerConfiguration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24481) Translit buffer debloater documenation to chinese

2021-10-08 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24481:
-

 Summary: Translit buffer debloater documenation to chinese
 Key: FLINK-24481
 URL: https://issues.apache.org/jira/browse/FLINK-24481
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Translit buffer debloater documenation to chinese



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24470) EMA for buffer debloat level

2021-10-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24470:
-

 Summary: EMA for buffer debloat level
 Key: FLINK-24470
 URL: https://issues.apache.org/jira/browse/FLINK-24470
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Right now, we use EMA for the smoothing calculation of the throughput. But 
there is an idea to use EMA on the buffer debloat(BufferDebloater) level 
because it contains more information(old buffer size, current throughput, 
buffer in use count, etc.) and it looks like it should be used more effectively.

!NB. This is just an idea which makes sense to try. but if the new 
implementation doesn't have real profit it makes sense to keep the old one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24469) Incorrect calcualtion of the buffer size in case of channel data skew

2021-10-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24469:
-

 Summary: Incorrect calcualtion of the buffer size in case of 
channel data skew
 Key: FLINK-24469
 URL: https://issues.apache.org/jira/browse/FLINK-24469
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


If only a couple out of 100 input channels are active then there is a huge 
throughput drop is observed. I suppose it happens because of the line inside of 
SingleInputGate#getBuffersInUseCount:

{noformat}

Math.max(1, channel.getBuffersInUseCount());

{noformat}

Right now, I don't think that it makes sense to have min value as 1, perhaps 
zero buffer in use is also ok.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24468) NPE when notifyNewBufferSize

2021-10-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24468:
-

 Summary: NPE when notifyNewBufferSize
 Key: FLINK-24468
 URL: https://issues.apache.org/jira/browse/FLINK-24468
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


In my opinion, we have two problem there:
 # The exception itself(see below)
 # Ignoring the exception and stopping rescheduling of the calculation of the 
buffer size.

Of course, we need to fix this NPE and we need to think what we want to do if 
the buffer debloat fails with error.

{noformat}

java.lang.NullPointerException: null
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.notifyNewBufferSize(CreditBasedPartitionRequestClientHandler.java:135)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.notifyNewBufferSize(NettyPartitionRequestClient.java:203)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyNewBufferSize(RemoteInputChannel.java:330)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.announceBufferSize(RemoteInputChannel.java:299)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.announceBufferSize(SingleInputGate.java:389)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.announceBufferSize(InputGateWithMetrics.java:102)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater.recalculateBufferSize(BufferDebloater.java:118)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:795)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_282]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:814)
 ~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
~[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 [flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:754) 
[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
[flink-dist_2.12-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24467) Set min and max buffer size even if the difference less than threshold

2021-10-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24467:
-

 Summary: Set min and max buffer size even if the difference less 
than threshold
 Key: FLINK-24467
 URL: https://issues.apache.org/jira/browse/FLINK-24467
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Right now, we apply a new buffer size only if it differs from the old buffer 
size more than the configured threshold but if the old buffer size is close to 
the max or min value less than this threshold we are always stuck on this 
value. For example, if we have the old buffer size 22k and our threshold is 50% 
then the value which we can apply should 33k but this is impossible because the 
max value is 32k so once we calculate the buffer size to 22k it is impossible 
to increase it.

The suggestion is to apply the changes every time when we calculate the new 
value to min or max size and the old value was different.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24294) Resources leak in the StreamTask constructor

2021-09-15 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24294:
-

 Summary: Resources leak in the StreamTask constructor
 Key: FLINK-24294
 URL: https://issues.apache.org/jira/browse/FLINK-24294
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov


Since we are initializing a lot of resources in the StreamTask constructor like 
RecordWriter, timerServices, etc. it is possible that some of these resources 
remain open if the exception happens below the initialization in the same 
constructor.
So in my opinion, we have two choices here: 
* Avoiding allocation of resources in the constructor which allows us to do 
something like:
{noformat}
StreamTask task = new StreamTask(); //no leaks if it fails
try { 
  task.init();
  
} finally {
  task.cleanUp();
}
{noformat}
*  or we can rewrite a code in such a way that exception in any constructor(ex. 
StreamTask) guarantee releasing the earlier allocated resources in this 
constructor. But it is not so easy to implement(see. initialization of 
recordWriter in StreamTask constructor)

So perhaps it makes sense to separate creating object from 
initialization(allocation resources)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24233) Receiving new buffer size before network reader configured

2021-09-09 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24233:
-

 Summary: Receiving new buffer size before network reader configured
 Key: FLINK-24233
 URL: https://issues.apache.org/jira/browse/FLINK-24233
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.14.1



{noformat}
2021-09-09 14:36:42,383 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Map -> Flat Map (71/75)#0 (7a5b971e0cd57aa5d057a114e2679b03) 
switched from RUNNING to FAILED with failure c
ause: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Fatal error at remote task manager 
'ip-172-31-22-183.eu-central-1.compute.internal/172.31.22.183:42085'.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:339)
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:240)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No reader for receiverId = 
296559f497c54a82534945f4549b9e2d exists.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.obtainReader(PartitionRequestQueue.java:194)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.notifyNewBufferSize(PartitionRequestQueue.java:188)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:134)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at 

[jira] [Created] (FLINK-24231) Buffer debloating microbenchmark for multiply gate

2021-09-09 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24231:
-

 Summary: Buffer debloating microbenchmark for multiply gate
 Key: FLINK-24231
 URL: https://issues.apache.org/jira/browse/FLINK-24231
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


It needs to expand the microbenchmark from 
https://issues.apache.org/jira/browse/FLINK-24230  with a scenario when 
different gates have:
* different throughput
* different record size



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24230) Buffer debloating microbenchmark for single gate

2021-09-09 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24230:
-

 Summary: Buffer debloating microbenchmark for single gate
 Key: FLINK-24230
 URL: https://issues.apache.org/jira/browse/FLINK-24230
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


Currently, there are no microbenchmarks that check buffer debloating 
effectiveness. The idea is to create one which will measure the checkpoint 
time.  The benchmark should be similar to `UnalignedCheckpointTimeBenchmark` 
but unlike the `UnalignedCheckpointTimeBenchmark` where we see the effect of 
`Buffer debloat` only for extremely small values like 1ms for 
BUFFER_DEBLOAT_TARGET. This benchmark should provide a more reliable way to 
check the different implementations of `Buffer debloat` it can be reached by 
increasing at least record size and checkpoint interval. The main target is to 
have how long will it take to do the checkpoint during backpressure when all 
buffers are full.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24191) Adjusting number of buffers besides buffer size

2021-09-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24191:
-

 Summary: Adjusting number of buffers besides buffer size
 Key: FLINK-24191
 URL: https://issues.apache.org/jira/browse/FLINK-24191
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


"Buffer debloat" adjusts only the buffer size but it also makes sense to adjust 
the number of buffers. It is not clear for now what should be adjusted and in 
which proportions so it needs to think about how to figure this out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24190) Handling large record with buffer debloat

2021-09-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24190:
-

 Summary: Handling large record with buffer debloat
 Key: FLINK-24190
 URL: https://issues.apache.org/jira/browse/FLINK-24190
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


If the buffer size will be too small(less than record size) due to buffer 
debloat it can lead to performance degradation. It looks like it is better to 
keep the buffer size greater than the record size(or even greater than it). So 
it needs to check how bad it can be and fix it.

Implementation should be easy, we can choose the maximum value between 
desirableBufferSize and recordSize during requesting the new 
buffer(BufferWritingResultPartition#addToSubpartition)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24189) Debloating buffer for multiply gates

2021-09-07 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24189:
-

 Summary: Debloating buffer for multiply gates
 Key: FLINK-24189
 URL: https://issues.apache.org/jira/browse/FLINK-24189
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


Right now, the buffer debloat suppose that works with SingleInputGates which 
have a similar load. It needs to support UnionInputGate in case of data skew. 
The possible implementation can be the calculation of the throughput separately 
for each gate or calculation of the total throughput but choosing the buffer 
size independently for each gate based on their buffers in use.

It is highly important to fairly share the throughput among all gates. In other 
words, avoid the situation:
* gate1 has a low load while gate2 has a high load
* the small buffer size was set for gate1 and the big buffer size for gate2
* the load for gate1 increased up to the load of gate2
* it is impossible to increase the buffer size for gate1 because it is no 
reason to decrease the buffer size for gate2 since the load for it doesn't 
change



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23975) High checkpoint time for unaligned checkpoint when throthling in source

2021-08-25 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23975:
-

 Summary: High checkpoint time for unaligned checkpoint when 
throthling in source
 Key: FLINK-23975
 URL: https://issues.apache.org/jira/browse/FLINK-23975
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov


According to a brief test, if we add a delay in the source before 
emitting(outside of the checkpoint lock synchronization) it leads to increasing 
unaligned checkpoint time(in my case from 1sec to 15sec).

Approximate logic for throttling in source:

 

 
{noformat}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running) {
LockSupport.parkNanos(sleepNanos);

synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(new 
Type(ThreadLocalRandom.current().nextLong(), time, payload));
}
}
}
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23974) Low throughput for buffer debloat in case of different load profile

2021-08-25 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23974:
-

 Summary: Low throughput for buffer debloat in case of different 
load profile
 Key: FLINK-23974
 URL: https://issues.apache.org/jira/browse/FLINK-23974
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


According to task https://issues.apache.org/jira/browse/FLINK-23456, we have 
some performance drop when the buffer debloat is enabled:
 * for static load with throttling in source - 1-3%
 * for sinus shape load with throttling in source - 2-3%
 * for erratic load with throttling in source - 5-6%

It needs to investigate the reason for that and try to improve if it is 
possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23973) FLIP-183: Buffer debloat 1.1

2021-08-25 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23973:
-

 Summary: FLIP-183: Buffer debloat 1.1
 Key: FLINK-23973
 URL: https://issues.apache.org/jira/browse/FLINK-23973
 Project: Flink
  Issue Type: New Feature
Reporter: Anton Kalashnikov


Second umbrella ticket for 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment]

This ticket should collect task for improvement the first version of the buffer 
debloat feature



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23928) Test the buffer debloating

2021-08-23 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23928:
-

 Summary: Test the buffer debloating
 Key: FLINK-23928
 URL: https://issues.apache.org/jira/browse/FLINK-23928
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


# {color:#6a8759}Configure the buffer debloat by setting 
`taskmanager.network.memory.buffer-debloat.enabled` to true{color}
 # {color:#6a8759}Start the job with a long checkpoint time{color}
 # {color:#6a8759}Check the alignment time for each subtask it should be near 
taskmanager.network.memory.buffer-debloat.target{color}
 # {color:#6a8759}Check the checkpoint time it should be near taskCount * 
taskmanager.network.memory.buffer-debloat.target{color}
 # {color:#6a8759}Try to change 
taskmanager.network.memory.buffer-debloat.target. Checkponint time should be 
changed correspondetly.{color}
 # {color:#6a8759}Check metrics 
{color}{color:#6a8759}estimatedTimeToConsumerBuffersMs and 
{color}{color:#6a8759}debloatedBufferSize which also should depend on the 
current debloat configuration.
{color}

{color:#6a8759} {color}

{color:#6a8759}Other configuration which can be changed for test.{color}
 * {color:#6a8759}taskmanager.memory.min-segment-size(if this would be equal to 
taskmanager.memory.segment-size then the behaviour should be same as with 
disabled buffer debloat)
{color}
 * {color:#6a8759}taskmanager.network.memory.buffer-debloat.period{color}
 * {color:#6a8759}taskmanager.network.memory.buffer-debloat.samples{color}
 * 
{color:#6a8759}taskmanager.network.memory.buffer-debloat.threshold-percentages{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23870) Test the ability for ignoring in-flight data on recovery

2021-08-19 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23870:
-

 Summary: Test the ability for ignoring in-flight data on recovery
 Key: FLINK-23870
 URL: https://issues.apache.org/jira/browse/FLINK-23870
 Project: Flink
  Issue Type: Technical Debt
Reporter: Anton Kalashnikov


Test for  https://issues.apache.org/jira/browse/FLINK-22684:
 # configure the externalize checkpoint for unaligned checkpoint
 # wait for the checkpoint then stop the job
 # restore from the last checkpoint - it should be restored on the last 
state(optional step)
 # set the 
`execution.checkpointing.recover-without-channel-state.checkpoint-id` to the 
last checkpoint
 # restore from the last checkpoint - in-flight data would be ignored so it 
would be restored a little less data than expected in the usual case.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23726) Buffer debloat configuration is taken from the wrong configuration

2021-08-11 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23726:
-

 Summary: Buffer debloat configuration is taken from the wrong 
configuration
 Key: FLINK-23726
 URL: https://issues.apache.org/jira/browse/FLINK-23726
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Right now, StreamTask receives the buffer debloat configuration from the 
taskConfiguration which is wrong. The right place for the debloat configuration 
is taskManagerConfiguration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23459) New metrics for dynamic buffer size

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23459:
-

 Summary: New metrics for dynamic buffer size
 Key: FLINK-23459
 URL: https://issues.apache.org/jira/browse/FLINK-23459
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


The following metrics can be added:
 * The total size of buffered in-flight data (both detailed per gate + sum per 
subtask)

 * Total estimated time to process the data (both detailed per gate + max per 
subtask)

 * Actual value of the calculated dynamic buffer size (only detailed per gate)

All of those metrics should be exposed only on the input side, taking into 
account only input in-flight data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23458) Document required number of buffers in the network stack

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23458:
-

 Summary: Document required number of buffers in the network stack
 Key: FLINK-23458
 URL: https://issues.apache.org/jira/browse/FLINK-23458
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


Currently it seems that it's not mentioned anywhere what's the number of 
required number of buffers. And that this number is different than the 
configured number of exclusive/floating buffers. It should be mentioned in that 
both {{taskmanager.network.memory.floating-buffers-per-gate}} and 
{{taskmanager.network.memory.buffers-per-channel}} are best effort and not 
guaranteed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23457) Sending the buffer of the right size for broadcast

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23457:
-

 Summary: Sending the buffer of the right size for broadcast
 Key: FLINK-23457
 URL: https://issues.apache.org/jira/browse/FLINK-23457
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


It is not enough to know just the number of available buffers (credits) for the 
downstream because the size of these buffers can be different. So we are 
proposing to resolve this problem in the following way: If the downstream 
buffer size is changed then the upstream should send the buffer of the size not 
greater than the new one regardless of how big the current buffer on the 
upstream. (pollBuffer should receive parameters like bufferSize and return 
buffer not greater than it)

Downstream will be able to support any buffer size < max buffer size, so it 
should be just good enough to request BufferBuilder with new size after getting 
announcement, and leaving existing BufferBuilder/BufferConsumers unchanged. In 
other words code in {{PipelinedSubpartition(View)}} doesn’t need to be changed 
(apart of forwarding new buffer size to the {{BufferWritingResultPartition}}). 
All buffer size adjustments can be implemented exclusively in 
{{BufferWritingResultPartition}}.

If different downstream subtasks have different throughput and hence different 
desired buffer sizes, then a single upstream subtask has to support having two 
different subpartitions with different buffer sizes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23456) Manually test on cluster

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23456:
-

 Summary: Manually test on cluster
 Key: FLINK-23456
 URL: https://issues.apache.org/jira/browse/FLINK-23456
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


Test different jobs:
 * a job with static throughput *(must work for successful MVP)*

 * a job with gracefully varying load (sinus shape load?) *(should work for 
successful MVP)*

 * a job with erratic load (emulating evil window operator firing) *(would be 
nice to work for MVP)*

Test on cluster all jobs and check if/how buffer size adjustment is not erratic.

Verify our final goals:
 * that aligned checkpoint time in back pressured jobs is reduced to sane 
values (roughly {{numberOfExchanges * configuredBufferedDataTime * 2}})

 * check if our changes affected the throughput or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23454) Sending the buffer of the right size for unicast

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23454:
-

 Summary: Sending the buffer of the right size for unicast
 Key: FLINK-23454
 URL: https://issues.apache.org/jira/browse/FLINK-23454
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


It is not enough to know just the number of available buffers (credits) for the 
downstream because the size of these buffers can be different. So we are 
proposing to resolve this problem in the following way: If the downstream 
buffer size is changed then the upstream should send the buffer of the size not 
greater than the new one regardless of how big the current buffer on the 
upstream. (pollBuffer should receive parameters like bufferSize and return 
buffer not greater than it)

Downstream will be able to support any buffer size < max buffer size, so it 
should be just good enough to request BufferBuilder with new size after getting 
announcement, and leaving existing BufferBuilder/BufferConsumers unchanged. In 
other words code in {{PipelinedSubpartition(View)}} doesn’t need to be changed 
(apart of forwarding new buffer size to the {{BufferWritingResultPartition}}). 
All buffer size adjustments can be implemented exclusively in 
{{BufferWritingResultPartition}}.

If different downstream subtasks have different throughput and hence different 
desired buffer sizes, then a single upstream subtask has to support having two 
different subpartitions with different buffer sizes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23453) Dynamic calculation of the buffer size

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23453:
-

 Summary: Dynamic calculation of the buffer size
 Key: FLINK-23453
 URL: https://issues.apache.org/jira/browse/FLINK-23453
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov


To calculate the desired buffer size we need to take into account the 
throughput, configuration(timeInInputBuffer), and the actual number of buffers 
in use. It makes sense to use EMA for this calculation to smoothen out 
intermittent spikes.

The calculation based on the actual number of buffers in use helps to avoid 
problems with the data skew (when only a couple of channels out of thousands 
have any data). So the solution needs to reliably and efficiently calculate 
either the estimated or an average number of buffers in use. 

Buffer size can be erratic if it’s not trivial to make it stable in the MVP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23452) Measuring subtask throughput

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23452:
-

 Summary: Measuring subtask throughput
 Key: FLINK-23452
 URL: https://issues.apache.org/jira/browse/FLINK-23452
 Project: Flink
  Issue Type: Sub-task
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


In the first implementation, throughput could be measured for the whole 
subtask. The throughput calculation should take into account the numbers of 
bytes that were handled, the backpressure time and ignore the idle time. The 
main idea is to keep the balance between idle and backpressure time, so if the 
backpressure time is high we should decrease the buffer size to provide the 
configured handling time and vice versa if the subtask is idle time that period 
should be ignored from calculating the throughput. Otherwise, in the case of 
network bottleneck, we might have ended up with a small buffer size that’s 
causing the bottleneck in the first place but we are not able to increase it 
due to idle time reducing throughput and lowering the buffer size.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23451) FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23451:
-

 Summary: FLIP-183: Dynamic buffer size adjustment
 Key: FLINK-23451
 URL: https://issues.apache.org/jira/browse/FLINK-23451
 Project: Flink
  Issue Type: New Feature
Reporter: Anton Kalashnikov


Umbrella ticket for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23041) Change local alignment timeout back to the global time out

2021-06-18 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23041:
-

 Summary: Change local alignment timeout back to the global time out
 Key: FLINK-23041
 URL: https://issues.apache.org/jira/browse/FLINK-23041
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Local alignment timeouts are very confusing and especially without timeout on 
the outputs, they can significantly delay timeouting to UC.

Problematic case is when all CBs are received with long delay because of the 
back pressure, but they arrive at the same time. Alignment time can be low 
(milliseconds), while start delay is ~1 minute. In that case checkpoint doesn't 
timeout to UC and is passing the responsibility to timeout down the stream.

 

So it is not so transparant for the user why and when AC switches to UC. As 
mentioned before, the start delay is not correlated with the alignment timeout 
because it doesn't take into account time in output buffer. the alignment time 
is not fully correlated with the alignment timeout because the alignment time 
doesn't take into account the barrier announcement.

 

Based on this, there is the proposal to change the semantic of alignmentTimeout 
configuration to such meaning:

*The time between the starting of checkpoint(on the checkpont coordinator) and 
the time when the checkpoint barrier will be received by task.*

By this definition, we will have kind of global timeout which says that if the 
AC isn't finished for alignmentTimeout time it will be switched to UC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23034) NPE in JobDetailsDeserializer during the reading old version of ExecutionState

2021-06-18 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-23034:
-

 Summary: NPE in JobDetailsDeserializer during the reading old 
version of ExecutionState
 Key: FLINK-23034
 URL: https://issues.apache.org/jira/browse/FLINK-23034
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


There is no compatibility for ExecutionState:

{noformat}

java.lang.NullPointerExceptionjava.lang.NullPointerException at 
org.apache.flink.runtime.messages.webmonitor.JobDetails$JobDetailsDeserializer.deserialize(JobDetails.java:308)
 at 
org.apache.flink.runtime.messages.webmonitor.JobDetails$JobDetailsDeserializer.deserialize(JobDetails.java:278)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3479)
 at 
org.apache.flink.runtime.messages.webmonitor.JobDetailsTest.testJobDetailsCompatibleUnmarshalling(JobDetailsTest.java:82)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
 at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
 at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22961) Incorrect calculation of alignment timeout for LocalInputChannel

2021-06-10 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-22961:
-

 Summary: Incorrect calculation of alignment timeout for 
LocalInputChannel
 Key: FLINK-22961
 URL: https://issues.apache.org/jira/browse/FLINK-22961
 Project: Flink
  Issue Type: Bug
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov


Right now, the calculation of alignment timeout happens inside of 
SingleCheckpointBarrierHandler(org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.ControllerImpl#isTimedOut)
 and it based on  firstBarrierArrivalTime.  the  firstBarrierArrivalTime 
recalculated only when barrier announcement was received but if we receive the 
first checkpoint barrier from the LocalInputChannel which doesn't support 
announcement, the calculation of alignment timeout will be based on the 
firstBarrierArrivalTime from the previous checkpoint which is wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22684) Add the ability to ignore in-flight data on recovery

2021-05-17 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-22684:
-

 Summary: Add the ability to ignore in-flight data on recovery
 Key: FLINK-22684
 URL: https://issues.apache.org/jira/browse/FLINK-22684
 Project: Flink
  Issue Type: Improvement
Reporter: Anton Kalashnikov


The main case:
 * We want to restore the last unaligned checkpoint.
 * In-flight data of this checkpoint is corrupted.
 * We want to ignore this corrupted data and restore only states.

The idea is having new configuration parameter('ignoreInFlightDataO Recovery' 
or similar). and If it set to true, ignore the metadata of in-flight data on 
the Checkpoint Coordinator side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22379) Introduce a new JobStatus to avoid premature checkpoint triggering

2021-04-20 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-22379:
-

 Summary: Introduce a new JobStatus to avoid premature checkpoint 
triggering
 Key: FLINK-22379
 URL: https://issues.apache.org/jira/browse/FLINK-22379
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Anton Kalashnikov


Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator 
to trigger checkpoint which is ok. But unfortunately, JobStatus switches to 
RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this 
leads to several problems, one of them you can see in the log:
{noformat}
WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Failed 
to trigger checkpoint for job 
bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException:
 Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7) 
of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. 
Aborting checkpoint. Failure reason: Not all required tasks are currently 
running.at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_272]at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat}
To avoid this problem, it is a good idea to introduce new JobStatus between 
CREATED and RUNNING(RESTORING?). And then:
 * JobStatus CREATED switches to RESTORING at the same time when right now 
CREATED switches to RUNNING
 * JobStatus RESTORING switches to RUNNING when all tasks switched their states 
from INITIALIZING to RUNNING


It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order 
to have the same name for job and task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22215) Rename RECOVERING state to RESTORING

2021-04-12 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-22215:
-

 Summary: Rename RECOVERING state to RESTORING
 Key: FLINK-22215
 URL: https://issues.apache.org/jira/browse/FLINK-22215
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Anton Kalashnikov


Task https://issues.apache.org/jira/browse/FLINK-17012 introduced new 
ExectuionState - RECOVERING which restore data on the last checkpoint. There 
are thoughts that RECOVERING is not a suitable name for this case and it is 
much better to rename it to RESTORING



--
This message was sent by Atlassian Jira
(v8.3.4#803005)