[jira] [Created] (FLINK-35933) Skip distributing maxAllowedWatermark if there are no subtasks

2024-07-30 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35933:
-

 Summary: Skip distributing maxAllowedWatermark if there are no 
subtasks
 Key: FLINK-35933
 URL: https://issues.apache.org/jira/browse/FLINK-35933
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0


On JM, `SourceCoordinator.announceCombinedWatermark` executes unnecessary if 
there are no subtasks to distribute maxAllowedWatermark. This involves Heap and 
ConcurrentHashMap accesses and lots of logging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)

2024-07-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35787:
-

 Summary: DefaultSlotStatusSyncer might bring down JVM (exit code 
239 instead of a proper shutdown)
 Key: FLINK-35787
 URL: https://issues.apache.org/jira/browse/FLINK-35787
 Project: Flink
  Issue Type: Bug
Reporter: Roman Khachatryan


In our internal CI, I've encountered the following error:
{code:java}
* 12:02:47,205 [   pool-126-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 
'pool-126-thread-1' produced an uncaught exception. Stopping the process...
  java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.>
          at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282) 
~[?:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645)
 ~[classes/:?]
          at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603)
 ~[classes/:?]
          at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
          at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
          at java.lang.Thread.run(Thread.java:829) [?:?]
  Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = 
java.util.concurrent.CompletableFuture$UniHandle@f3d>
          at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
 ~[?:?]
          at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) 
~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
 ~[?:?]
          at 
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705)
 ~[?:?]
          at 
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
 ~[?:?]
          at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949)
 ~[?:?]
          ... 11 more{code}
>From the code, it looks like RM main thread executor was shut down, and that 
>triggered JVM exit:
{code:java}
        CompletableFuture requestFuture =
                gateway.requestSlot(
                        SlotID.getDynamicSlotID(resourceId),
                        jobId,
                        allocationId,
                        resourceProfile,
                        targetAddress,
                        resourceManagerId,
                        taskManagerRequestTimeout);        
CompletableFuture returnedFuture = new CompletableFuture<>();        
FutureUtils.assertNoException(
                requestFuture.handleAsync(
                        (Acknowledge acknowledge, Throwable throwable) -> { ... 
},
                        mainThreadExecutor));
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35786) NPE in BlobServer / shutdownHook

2024-07-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35786:
-

 Summary: NPE in BlobServer / shutdownHook
 Key: FLINK-35786
 URL: https://issues.apache.org/jira/browse/FLINK-35786
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.1
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0, 1.19.2


In constructor, BlobServer registers a shutdown hook to close the socket.

Later in constructor, BlobServer creates this socket (and makes sure it's not 
null).

 

But if the shutdown hook gets invoked before opening the socket, NPE will be 
thrown:
{code:java}
  12:02:49,983 [PermanentBlobCache shutdown hook] INFO  
org.apache.flink.runtime.blob.PermanentBlobCache             [] - Shutting down 
BLOB cache
  12:02:49,985 [BlobServer shutdown hook] ERROR 
org.apache.flink.runtime.blob.BlobServer                     [] - Error during 
shutdown of BlobServer via JVM shutdown hook.
  java.lang.NullPointerException: null
          at 
org.apache.flink.runtime.blob.BlobServer.close(BlobServer.java:358) 
~[classes/:?]
          at 
org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
 ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
          at java.lang.Thread.run(Thread.java:829) [?:?]
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35769) State files might not be deleted on task cancellation

2024-07-05 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35769:
-

 Summary: State files might not be deleted on task cancellation
 Key: FLINK-35769
 URL: https://issues.apache.org/jira/browse/FLINK-35769
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.19.1
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0


We have a job in an infinite (fast) restart loop, that’s crashing with a 
serialization issue.
The issue here is that each restart seems to leak state files (not cleaning up 
ones from the previous run):

{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
7990}}
{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
689}}
Eventually TM will use too much disk space.

 

The problem is in 
[https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
{code:java}
try {
            List> futures =
                    transferAllStateDataToDirectoryAsync(downloadRequests, 
internalCloser)
                            .collect(Collectors.toList());
            // Wait until either all futures completed successfully or one 
failed exceptionally.
            FutureUtils.completeAll(futures).get();
        } catch (Exception e) {
            downloadRequests.stream()
                    .map(StateHandleDownloadSpec::getDownloadDestination)
                    .map(Path::toFile)
                    .forEach(FileUtils::deleteDirectoryQuietly); {code}
Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them.
But if {{completeAll}} is interrupted, then download runnable might re-create 
it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35742) Don't create RocksDB CF if task cancellation is in progress

2024-07-02 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35742:
-

 Summary: Don't create RocksDB CF if task cancellation is in 
progress
 Key: FLINK-35742
 URL: https://issues.apache.org/jira/browse/FLINK-35742
 Project: Flink
  Issue Type: Improvement
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35557) MemoryManager only reserves memory per consumer type once

2024-06-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35557:
-

 Summary: MemoryManager only reserves memory per consumer type once
 Key: FLINK-35557
 URL: https://issues.apache.org/jira/browse/FLINK-35557
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Runtime / Task
Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3, 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0, 1.19.1


# In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we 
[create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526]
 a reserve function
 # The function 
[decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61]
 the available Slot memory and fails if there's not enough memory
 # We pass it to {{SharedResources.getOrAllocateSharedResource}}
 # In {{SharedResources.getOrAllocateSharedResource}} , we check if the 
resource (memory) was already reserved by some key (e.g. 
{{{}state-rocks-managed-memory{}}})
 # If not, we create a new one and call the reserve function
 # If the resource was already reserved (not null), we do NOT reserve the 
memory again: 
[https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71]


So there will be only one (first) memory reservation for rocksdb for example, 
no matter how many state backends are created. Meaning that managed memory 
limits are not followed (edited) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED

2024-06-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35556:
-

 Summary: Wrong constant in 
RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
 Key: FLINK-35556
 URL: https://issues.apache.org/jira/browse/FLINK-35556
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.19.0, 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0, 1.19.1


See 
https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35501) Use common thread pools when transferring RocksDB state files

2024-05-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35501:
-

 Summary: Use common thread pools when transferring RocksDB state 
files
 Key: FLINK-35501
 URL: https://issues.apache.org/jira/browse/FLINK-35501
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0


Currently, each RocksDB state backend creates an executor backed by a thread 
pool.

This makes it difficult to control the total number of threads per TM because 
it might have at least one task per slot and theoretically, many state backends 
per task (because of chaining).

Additionally, using a common thread pool allows to indirectly control the load 
on the underlying DFS (e.g. the total number of requests to S3 from a TM).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34994) JobIDLoggingITCase fails because of "checkpoint confirmation for unknown task"

2024-04-03 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-34994:
-

 Summary: JobIDLoggingITCase fails because of "checkpoint 
confirmation for unknown task"
 Key: FLINK-34994
 URL: https://issues.apache.org/jira/browse/FLINK-34994
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58640=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8735]

 

[https://github.com/apache/flink/actions/runs/8502821551/job/23287730632#step:10:8131]

 

[https://github.com/apache/flink/actions/runs/8507870399/job/23300810619#step:10:8086]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34559) TVF Window Aggregations might stuck

2024-03-01 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-34559:
-

 Summary: TVF Window Aggregations might stuck
 Key: FLINK-34559
 URL: https://issues.apache.org/jira/browse/FLINK-34559
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.1, 1.19.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


RecordsWindowBuffer flushes buffered records in the following cases:
 * watermark
 * checkpoint barrier
 * buffer overflow

 

In two-phase aggregations, this creates the following problems:

1) Local aggregation: enters hard-backpressure because for flush, it outputs 
the data downstream and doesn't check network buffer availability

This already disrupts normal checkpointing and watermarks progression

 

2) Global aggregation: 

When the window is large enough and/or the watermark is lagging, lots of data 
is flushed to state backend (and the state is updated) in checkpoint SYNC phase.

 

All this eventually causes checkpoint timeouts (10 minutes in our env).

 

Example query
{code:java}
INSERT INTO `target_table` 

SELECT window_start, window_end, some, attributes, SUM(view_time) AS 
total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages 

FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' HOUR)) 

GROUP BY window_start, window_end, some, attributes;{code}
 

As a quick fix, we might want to:
 # limit the amount of data buffered in Global Aggregation nodes
 # disable two-phase aggregations, i.e. Local Aggregations (we can try to limit 
buffing there two, but network buffer availability can not be easily checked 
from the operator)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34420) Various YARN tests fail after failing to download hadoop.tar.gz

2024-02-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-34420:
-

 Summary: Various YARN tests fail after failing to download 
hadoop.tar.gz
 Key: FLINK-34420
 URL: https://issues.apache.org/jira/browse/FLINK-34420
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.19.0, 1.18.2, 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.19.0, 1.18.2, 1.20.0


https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1702=logs=0e31ee24-31a6-528c-a4bf-45cde9b2a14e=696bc156-f753-5888-468e-42d78df39222=11334

https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1700=logs=bbbd0720-137e-5f59-95a5-b5d332f196d3=4769aa47-e87b-5ecd-1fb2-14d52396866d=9937

```
2024-02-09T19:21:57.8947690Z Feb 09 19:21:57 Pre-downloading Hadoop tarball
2024-02-09T19:21:57.9250518Z   % Total% Received % Xferd  Average Speed   
TimeTime Time  Current
2024-02-09T19:21:57.9250844Z  Dload  Upload   
Total   SpentLeft  Speed
2024-02-09T19:21:57.9251910Z 
2024-02-09T19:21:58.0005684Z   0 00 00 0  0  0 
--:--:-- --:--:-- --:--:-- 0
2024-02-09T19:21:58.0006583Z 100   288  100   2880 0   3789  0 
--:--:-- --:--:-- --:--:--  3789
```
which is way too small - meaning we got redirection 
(https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1700=logs=bbbd0720-137e-5f59-95a5-b5d332f196d3=4769aa47-e87b-5ecd-1fb2-14d52396866d=8056
 )

Later, it can't be unpacked:
https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1700=logs=bbbd0720-137e-5f59-95a5-b5d332f196d3=4769aa47-e87b-5ecd-1fb2-14d52396866d=9657
```
#11 [ 7/28] COPY hadoop.tar.gz /tmp/hadoop.tar.gz
#11 CACHED

#12 [ 8/28] RUN set -x && mkdir -p /usr/local/hadoop && tar -xf 
/tmp/hadoop.tar.gz --strip-components=1 -C /usr/local/hadoop && rm 
/tmp/hadoop.tar.gz*
#12 0.175 + mkdir -p /usr/local/hadoop
#12 0.177 + tar -xf /tmp/hadoop.tar.gz --strip-components=1 -C /usr/local/hadoop
#12 0.178 tar: This does not look like a tar archive
#12 0.179 
#12 0.179 gzip: stdin: not in gzip format
#12 0.179 tar: Child returned status 1
#12 0.179 tar: Error is not recoverable: exiting now

```




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34417) Add JobID to logging MDC

2024-02-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-34417:
-

 Summary: Add JobID to logging MDC
 Key: FLINK-34417
 URL: https://issues.apache.org/jira/browse/FLINK-34417
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Coordination, Runtime 
/ Task
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.19.0


Adding JobID to logging MDC allows to apply Structural Logging 
and analyze Flink logs more efficiently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34344) Wrong JobID in CheckpointStatsTracker

2024-02-02 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-34344:
-

 Summary: Wrong JobID in CheckpointStatsTracker
 Key: FLINK-34344
 URL: https://issues.apache.org/jira/browse/FLINK-34344
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.19.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.19.0


The job id is generated randomly:
```
public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup 
metricGroup) {
this(numRememberedCheckpoints, metricGroup, new JobID(), 
Integer.MAX_VALUE);
}
```
This affects how it is logged (or reported elsewhere).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33590) CheckpointStatsTracker.totalNumberOfSubTasks not updated

2023-11-17 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-33590:
-

 Summary: CheckpointStatsTracker.totalNumberOfSubTasks not updated
 Key: FLINK-33590
 URL: https://issues.apache.org/jira/browse/FLINK-33590
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.18.1


On rescaling, the DoP is obtained from the JobGraph. 
However, JobGraph vertices are not updated once created. This results in 
missing traces on rescaling (isComplete returns false).

Instead, it should be obtained from DoP store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33442) UnsupportedOperationException thrown from RocksDBIncrementalRestoreOperation

2023-11-02 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-33442:
-

 Summary: UnsupportedOperationException thrown from 
RocksDBIncrementalRestoreOperation
 Key: FLINK-33442
 URL: https://issues.apache.org/jira/browse/FLINK-33442
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.17.1
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.17.2


When using the new rescaling API, it's possible to get
{code:java}
2023-10-31 18:25:05,179 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.lang.UnsupportedOperationException: null
at java.util.Collections$1.remove(Collections.java:4714) ~[?:?]
at java.util.AbstractCollection.remove(AbstractCollection.java:299) 
~[?:?]
at 
org.apache.flink.runtime.checkpoint.StateObjectCollection.remove(StateObjectCollection.java:105)
 ~[flink-runtime-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:294)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:167)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:327)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:338)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:355)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:166)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:735)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:710)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:676)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 [flink-runtime-1.17.1-143.jar:1.17.1-143]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) 
[flink-runtime-1.17.1-143.jar:1.17.1-143]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-runtime-1.17.1-143.jar:1.17.1-143]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-runtime-1.17.1-143.jar:1.17.1-143]
at java.lang.Thread.run(Thread.java:829) [?:?]
2023-10-31 18:25:05,182 WARN  
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
Exception while restoring keyed state backend for 
KeyedProcessOperator_353a6b34b8b7f1c1d0fb4616d911049c_(1/2) from alternative 
(1/2), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected 
exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:407)
 ~[flink-dist-1.17.1-143.jar:1.17.1-143]
at 

[jira] [Created] (FLINK-31601) While waiting for resources, resources check might be scheduled unlimited number of times (Adaptive Scheduler)

2023-03-23 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-31601:
-

 Summary: While waiting for resources, resources check might be 
scheduled unlimited number of times (Adaptive Scheduler)
 Key: FLINK-31601
 URL: https://issues.apache.org/jira/browse/FLINK-31601
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Roman Khachatryan
 Fix For: 1.17.1


See [https://github.com/apache/flink/pull/22169#discussion_r1136395017]
{quote}when {{resourceStabilizationDeadline}} is not null, should we skip 
scheduling {{checkDesiredOrSufficientResourcesAvailable}} (on [line 
166|https://github.com/apache/flink/blob/a64781b1ef8f129021bdcddd3b07548e6caa4a72/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java#L166])?
Otherwise, we schedule as many checks as there are changes in resources.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31261) Make AdaptiveScheduler aware of the (local) state size

2023-02-28 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-31261:
-

 Summary: Make AdaptiveScheduler aware of the (local) state size
 Key: FLINK-31261
 URL: https://issues.apache.org/jira/browse/FLINK-31261
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.18.0


FLINK-21450 makes the Adaptive Schulder aware of Local Recovery.

Each slot-group pair is assigned a score based on a keyGroupRange size.

That score isn't always optimlal - it could be improved by computing the score 
based on the actual state size on disk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30073) Managed memory can be wasted if rocksdb memory is fixed-per-slot

2022-11-17 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-30073:
-

 Summary: Managed memory can be wasted if rocksdb memory is 
fixed-per-slot
 Key: FLINK-30073
 URL: https://issues.apache.org/jira/browse/FLINK-30073
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Roman Khachatryan


When 
[state.backend.rocksdb.memory.fixed-per-slot|https://github.com/apache/flink/blob/ba4b182955867fedfa9891bf0bf430e92eeab41a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java#L75]
 is set, RocksDB does not use managed memory.

However, the runtime [doesn't take this into 
account|https://github.com/apache/flink/blob/ba4b182955867fedfa9891bf0bf430e92eeab41a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java#L75]
 and still reserves the managed memory according to the configured weigths.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29985) SlotTable not close on TM termination

2022-11-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-29985:
-

 Summary: SlotTable not close on TM termination
 Key: FLINK-29985
 URL: https://issues.apache.org/jira/browse/FLINK-29985
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.16.0, 1.15.3
Reporter: Roman Khachatryan


When a slot is released, the associated resources are released as well, in 
particular, MemoryManager. MemoryManager might hold not only memory, but also 
some arbitrary shared resources (currently, PythonSharedResources and 
RocksDBSharedResources).

When TM is stopped by JManager, its slot table is closed, causing all its slot 
to be released

When TM is stopped by SIGTERM (i.e. external resource manager), its slot table 
is NOT closed.

That means that in standalone clusters, some resources might not be released.

 

As of now, RocksDBSharedResources contains only ephemeral resources.

Not sure about PythonSharedResources, but likely it is associated with a 
separate process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29928) Allow sharing (RocksDB) memory between slots

2022-11-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-29928:
-

 Summary: Allow sharing (RocksDB) memory between slots
 Key: FLINK-29928
 URL: https://issues.apache.org/jira/browse/FLINK-29928
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Configuration, Runtime / State Backends, 
Runtime / Task
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.17.0


h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal, if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM so that it can be 
shared across all its RocksDB instances.
That would allow to reduce the overall memory consuption in exchange for 
resource isolation.

h1. Proposed changes

h2. Configuration
- introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
-- cluster-level (yaml only)
-- the non-shared memory will be used as it is now (exclusively per-slot)
- introduce "state.backend.memory.share-scope"
-- job-level (yaml and StateBackend)
-- possible values: NONE, SLOT, TASK_MANAGER
-- default: not set
-- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but 
don't deprecate it, because it specifies the size)
- rely on the existing "state.backend.rocksdb.memory.managed" to decide whether 
the shared memory is managed or unmanaged
- when calculating TM-wise shared  memory, ignore 
"taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
consumer so far
- similarly, exclude StateBackend from weights calculations, so other consumers 
(e.g. PYTHON) can better utilize exclusive slot memory
- use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"

h2. Example
{code}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
shared managed memory
taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
exclusive managed memory
cluster.fine-grained-resource-management.enabled: false

job 1:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 2:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 3:
state.backend.memory.share-scope: SLOT
state.backend.rocksdb.memory.managed: true

job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false

job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with 
each other.
Job 3 will only use exclusive slot memory (25mb per slot).
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with 
each other.

Python code (or other consumers) will be able to use up to 25mb per slot in 
jobs 1,2,4,5.

h2. Creating and sharing RocksDB objects
Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
- Memory manager manages OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.

h2. Lifecycle
The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.

h1. Testing
One way to test that the same RocksDB cache is used is via RocksDB metrics.

h1. Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29158) Fix logging in DefaultCompletedCheckpointStore

2022-08-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-29158:
-

 Summary: Fix logging in DefaultCompletedCheckpointStore
 Key: FLINK-29158
 URL: https://issues.apache.org/jira/browse/FLINK-29158
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.15.2
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.3


See [https://github.com/apache/flink/pull/16582#discussion_r949214456]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29157) Clarify the contract between CompletedCheckpointStore and SharedStateRegistry

2022-08-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-29157:
-

 Summary: Clarify the contract between CompletedCheckpointStore and 
SharedStateRegistry
 Key: FLINK-29157
 URL: https://issues.apache.org/jira/browse/FLINK-29157
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Checkpointing
Affects Versions: 1.15.2, 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0, 1.15.3


After FLINK-24611, CompletedCheckpointStore is required to call 
SharedStateRegistry.unregisterUnusedState() on checkpoint subsumption and 
shutdown.

Although it's not clear whether CompletedCheckpointStore is internal there are 
in fact external implementations (which weren't updated accordingly).

 

After FLINK-25872, CompletedCheckpointStore also must call 
checkpointsCleaner.cleanSubsumedCheckpoints.

 

Another issue with a custom implementation was using different java objects for 
state for CheckpointStore and SharedStateRegistry (after FLINK-24086). 

 

So it makes sense to:
 * clarify the contract (different in 1.15 and 1.16)
 * require using the same checkpoint objects by SharedStateRegistryFactory and 
CompletedCheckpointStore
 * mark the interface(s) as PublicEvolving



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28976) Changelog 1st materialization delayed unneccesarily

2022-08-15 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-28976:
-

 Summary: Changelog 1st materialization delayed unneccesarily
 Key: FLINK-28976
 URL: https://issues.apache.org/jira/browse/FLINK-28976
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.1, 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0, 1.15.2


In PeriodicMaterializationManager.start(), the 1st materialization is scheduled 
with a delay: materialization_interval + random_offset 

Here, random_offset is added to avoid thundering herd problem.
The next materialization will be scheduled with a delay of only 
materialization_interval.

That means that the 1st materialization will have to compact up to 2 times more 
state changes than the subsequent ones. 

Which in turn can cause FLINK--26590 or other problems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28931) BlockingPartitionBenchmark doesn't compile

2022-08-11 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-28931:
-

 Summary: BlockingPartitionBenchmark doesn't compile
 Key: FLINK-28931
 URL: https://issues.apache.org/jira/browse/FLINK-28931
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


{code}
10:15:12  [ERROR] 
/home/jenkins/workspace/flink-master-benchmarks-java8/flink-benchmarks/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java:117:50:
  error: cannot find symbol
{code}

Caused by
https://github.com/apache/flink/commit/9f5d0c48f198ff69a175f630832687ba02cf4c3e#diff-f72e79ebd747b6fde91988d65de9121a5907c97e4630cb1e30ab65601b4d9753R79



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28647) Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint

2022-07-22 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-28647:
-

 Summary: Remove separate error handling and adjust documentation 
for CLAIM mode + RocksDB native savepoint
 Key: FLINK-28647
 URL: https://issues.apache.org/jira/browse/FLINK-28647
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0


After FLINK-25872, checkpoint folder deletion is not performed as long as there 
is some state from that checkpoint used by other checkpoints.
Therefore, the following changes could be reverted/adjusted:
* FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception
* FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental 
savepoints in CLAIM mode limitation

cc: [~Yanfei Lei]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28597) Empty checkpoint folders not deleted on job cancellation if their shared state is still in use

2022-07-18 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-28597:
-

 Summary: Empty checkpoint folders not deleted on job cancellation 
if their shared state is still in use
 Key: FLINK-28597
 URL: https://issues.apache.org/jira/browse/FLINK-28597
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0


After FLINK-25872, SharedStateRegistry registers all state handles, including 
private ones.
Once the state isn't use AND the checkpoint is subsumed, it will actually be 
discarded.
This is done to prevent premature deletion when recovering in CLAIM mode:
1. RocksDB native savepoint folder (shared state is stored in chk-xx folder so 
it might fail the deletion)
2. Initial non-changelog checkpoint when switching to changelog-based 
checkpoints (private state of the initial checkpoint might be included into 
later checkpoints and its deletion would invalidate them)

Additionally, checkpoint folders are not deleted for a longer time which might 
be confusing.
In case of a crash, more folders will remain.

cc: [~Yanfei Lei], [~ym]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27571) Recognize "less is better" benchmarks in regression detection script

2022-05-11 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27571:
-

 Summary: Recognize "less is better" benchmarks in regression 
detection script
 Key: FLINK-27571
 URL: https://issues.apache.org/jira/browse/FLINK-27571
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
 Attachments: Screenshot_2022-05-09_10-33-11.png

http://codespeed.dak8s.net:8000/timeline/#/?exe=5=schedulingDownstreamTasks.BATCH=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27556) Performance regression in checkpointSingleInput.UNALIGNED on 29.04.2022

2022-05-09 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27556:
-

 Summary: Performance regression in checkpointSingleInput.UNALIGNED 
on 29.04.2022
 Key: FLINK-27556
 URL: https://issues.apache.org/jira/browse/FLINK-27556
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Roman Khachatryan


http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27555) Performance regression in schedulingDownstreamTasks on 02.05.2022

2022-05-09 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27555:
-

 Summary: Performance regression in schedulingDownstreamTasks on 
02.05.2022
 Key: FLINK-27555
 URL: https://issues.apache.org/jira/browse/FLINK-27555
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Roman Khachatryan


http://codespeed.dak8s.net:8000/timeline/#/?exe=5=schedulingDownstreamTasks.BATCH=on=on=off=2=200



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27150) Improve error reporting in Flink UI

2022-04-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27150:
-

 Summary: Improve error reporting in Flink UI
 Key: FLINK-27150
 URL: https://issues.apache.org/jira/browse/FLINK-27150
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0


- don't hide after a timeout (was: 4.5s)
- expand the message box



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27149) KafkaSourceE2ECase.testScaleUp failed on AZP

2022-04-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27149:
-

 Summary: KafkaSourceE2ECase.testScaleUp failed on AZP
 Key: FLINK-27149
 URL: https://issues.apache.org/jira/browse/FLINK-27149
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Tests
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
 Fix For: 1.16.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34404=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=15393

{code:java}
[INFO] ---
[INFO]  T E S T S
[INFO] ---
[INFO] Running org.apache.flink.tests.util.kafka.KafkaSourceE2ECase
[ERROR] Tests run: 16, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
174.171 s <<< FAILURE! - in org.apache.flink.tests.util.kafka.KafkaSourceE2ECase
[ERROR] 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testScaleUp(TestEnvironment,
 DataStreamSourceExternalContext, CheckpointingMode)[1]  Time elapsed: 2.7 s  
<<< ERROR!
java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.restartFromSavepoint(SourceTestSuiteBase.java:343)
at 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.testScaleUp(SourceTestSuiteBase.java:258)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 

[jira] [Created] (FLINK-27148) UnalignedCheckpointITCase fails on AZP

2022-04-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27148:
-

 Summary: UnalignedCheckpointITCase fails on AZP
 Key: FLINK-27148
 URL: https://issues.apache.org/jira/browse/FLINK-27148
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Network
Affects Versions: 1.15.0, 1.16.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0, 1.16.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=5812]

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=6018]

 {code}
[ERROR] Tests run: 22, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
174.732 s <<< FAILURE! - in 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
[ERROR] UnalignedCheckpointITCase.execute  Time elapsed: 6.408 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at 
org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:184)
at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:287)
at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at 

[jira] [Created] (FLINK-27144) Provide timeout details when calling FutureUtils.orTimeout

2022-04-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27144:
-

 Summary: Provide timeout details when calling FutureUtils.orTimeout
 Key: FLINK-27144
 URL: https://issues.apache.org/jira/browse/FLINK-27144
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.14.4, 1.13.6, 1.15.0, 1.16.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0, 1.16.0


There are two versions of FutureUtils.orTimeout() that use null as an error 
message when the timeout happens. They are used by (in 1.16):
 * DefaultScheduler.registerProducedPartitions  
 * DeclarativeSlotPoolBridge.internalRequestNewSlot  
 * CompletedOperationCache.closeAsync  
 * TaskManagerRunner.onFatalError  
 * RestClusterClient.getWebMonitorBaseUrl

This makes it difficult to debug those timeouts, in particular during the 
shutdown. See 
[this|https://lists.apache.org/thread/5wxv2occohc6ky1g754n7o8b8ssjcqf5] thread 
for example.

 

Replacing null with an actual message ease improve the debugging; the message 
could be made mandatory.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint

2022-04-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27132:
-

 Summary: CheckpointResourcesCleanupRunner might discard shared 
state of the initial checkpoint
 Key: FLINK-27132
 URL: https://issues.apache.org/jira/browse/FLINK-27132
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0, 1.16.0
Reporter: Roman Khachatryan


When considering the following case: # A job starts from a checkpoint in 
NO_CLAIM mode, with incremental checkpoints enabled
 # It produces some new checkpoints and subsumes the original one (not 
discarding shared state - before FLINK-24611 or after FLINK-26985)
 # Job terminates abruptly
 # The cleaner is started for that job
 # ZK doesn't have the initial checkpoint, so the store will load only the new 
checkpoints (created in 2). Shared state is registered
 # The store is shut down - discarding all the checkpoints and also any shared 
state

 
In 6, if some checkpoint uses the initial state, it will also be discarded
 
[~mapohl] could you please confirm this?
 
cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27114) On JM restart, the information about the initial checkpoints can be lost

2022-04-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-27114:
-

 Summary: On JM restart, the information about the initial 
checkpoints can be lost
 Key: FLINK-27114
 URL: https://issues.apache.org/jira/browse/FLINK-27114
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.4, 1.15.0, 1.16.0
Reporter: Roman Khachatryan
 Fix For: 1.16.0, 1.14.5, 1.15.1


Scenario (1.14):
 # A job starts from an existing checkpoint 1, with incremental checkpoints 
enabled
 # Checkpoint 1 is loaded with discardOnSubsume=false by 
CheckpointCoordinator.restoreSavepoint
 # A new checkpoint 2 completes, it reuses some state from the initial 
checkpoint
 # At some point, checkpoint 1 is subsumed, but the state is not discarded 
(thanks to discardOnSubsume=false, ref counts stay 1)
 # JM crashes
 # JM restarts, loads the checkpoints 2..x from ZK (or other store) -   
discardOnSubsume=true (as deserialized from handles)
 # At some point, checkpoint 2 is subsumed and the initial shared state is not 
used anymore; because checkpoint 2 has discardOnSubsume=true, shared state will 
be erroneously discarded

In 1.15, there were the following changes:
 # RestoreMode was added; only NO_CLAIM and LEGACY  modes are affected
 # SharedStateRegistry was changed from refCounts to highest checkpoint ID
 # In step (7), state will not be discarded; however, because it's impossible 
to distinguish initial state from the state created by this job, the latter 
will not be discarded as well, leading to left-over state artifacts.

The proposed solution is to store the initial checkpoint ID (in store such as 
ZK or in checkpoints) and adjust steps 6 or 7.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26968) Bump CopyOnWriteStateMap entry version before write

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26968:
-

 Summary: Bump CopyOnWriteStateMap entry version before write
 Key: FLINK-26968
 URL: https://issues.apache.org/jira/browse/FLINK-26968
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26967) Fix race condition in CopyOnWriteStateMap

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26967:
-

 Summary: Fix race condition in CopyOnWriteStateMap
 Key: FLINK-26967
 URL: https://issues.apache.org/jira/browse/FLINK-26967
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26966) Implement incremental checkpoints

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26966:
-

 Summary: Implement incremental checkpoints
 Key: FLINK-26966
 URL: https://issues.apache.org/jira/browse/FLINK-26966
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26965) Allow reuse of PeriodicMaterializationManager

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26965:
-

 Summary: Allow reuse of PeriodicMaterializationManager
 Key: FLINK-26965
 URL: https://issues.apache.org/jira/browse/FLINK-26965
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26964) Notify CheckpointStrategy about checkpoint completion/abortion

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26964:
-

 Summary: Notify CheckpointStrategy about checkpoint 
completion/abortion
 Key: FLINK-26964
 URL: https://issues.apache.org/jira/browse/FLINK-26964
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26963) Allow heap state backend creation customization

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26963:
-

 Summary: Allow heap state backend creation customization
 Key: FLINK-26963
 URL: https://issues.apache.org/jira/browse/FLINK-26963
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26956) AZP: don't log "[ERROR] Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError"

2022-03-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26956:
-

 Summary: AZP: don't log "[ERROR] Picked up JAVA_TOOL_OPTIONS: 
-XX:+HeapDumpOnOutOfMemoryError"
 Key: FLINK-26956
 URL: https://issues.apache.org/jira/browse/FLINK-26956
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Affects Versions: 1.16.0
Reporter: Roman Khachatryan


The message makes searching for real errors more difficult.

Probably grep -v will suffice, but need to be careful with exit codes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26912) EventTimeWindowCheckpointingITCase.testTumblingTimeWindow failed on azure

2022-03-29 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26912:
-

 Summary: EventTimeWindowCheckpointingITCase.testTumblingTimeWindow 
failed on azure
 Key: FLINK-26912
 URL: https://issues.apache.org/jira/browse/FLINK-26912
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Roman Khachatryan
 Fix For: 1.16.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=33856=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=13133

{code}
2022-03-28T21:19:57.3494606Z Mar 28 21:19:57 [ERROR] Tests run: 60, Failures: 
1, Errors: 0, Skipped: 0, Time elapsed: 413.405 s <<< FAILURE! - in 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
2022-03-28T21:19:57.3496371Z Mar 28 21:19:57 [ERROR] 
EventTimeWindowCheckpointingITCase.testTumblingTimeWindow  Time elapsed: 21.757 
s  <<< FAILURE!
2022-03-28T21:19:57.3497179Z Mar 28 21:19:57 java.lang.AssertionError: Job 
completed with illegal application status: UNKNOWN.
2022-03-28T21:19:57.3497773Z Mar 28 21:19:57at 
org.junit.Assert.fail(Assert.java:89)
2022-03-28T21:19:57.3498549Z Mar 28 21:19:57at 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.testTumblingTimeWindow(EventTimeWindowCheckpointingITCase.java:350)
2022-03-28T21:19:57.3499831Z Mar 28 21:19:57at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-03-28T21:19:57.3501367Z Mar 28 21:19:57at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-03-28T21:19:57.3502820Z Mar 28 21:19:57at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-03-28T21:19:57.3503493Z Mar 28 21:19:57at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-03-28T21:19:57.3504135Z Mar 28 21:19:57at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2022-03-28T21:19:57.3504854Z Mar 28 21:19:57at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2022-03-28T21:19:57.3505569Z Mar 28 21:19:57at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2022-03-28T21:19:57.3506256Z Mar 28 21:19:57at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2022-03-28T21:19:57.3506979Z Mar 28 21:19:57at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2022-03-28T21:19:57.3508161Z Mar 28 21:19:57at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2022-03-28T21:19:57.3509166Z Mar 28 21:19:57at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-03-28T21:19:57.3509812Z Mar 28 21:19:57at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
2022-03-28T21:19:57.3510607Z Mar 28 21:19:57at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-03-28T21:19:57.3511226Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-03-28T21:19:57.3511889Z Mar 28 21:19:57at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
2022-03-28T21:19:57.3512551Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
2022-03-28T21:19:57.3513209Z Mar 28 21:19:57at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
2022-03-28T21:19:57.3513911Z Mar 28 21:19:57at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
2022-03-28T21:19:57.3514562Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-03-28T21:19:57.3515168Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-03-28T21:19:57.3515774Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-03-28T21:19:57.3516396Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
2022-03-28T21:19:57.3517018Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
2022-03-28T21:19:57.3517621Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
2022-03-28T21:19:57.3518185Z Mar 28 21:19:57at 
org.junit.runners.Suite.runChild(Suite.java:128)
2022-03-28T21:19:57.3518729Z Mar 28 21:19:57at 
org.junit.runners.Suite.runChild(Suite.java:27)
2022-03-28T21:19:57.3519282Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-03-28T21:19:57.3519888Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-03-28T21:19:57.3520621Z Mar 28 21:19:57at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-03-28T21:19:57.3521390Z Mar 28 21:19:57at 

[jira] [Created] (FLINK-26853) HeapStateBackend ignores metadata updates in certain cases

2022-03-24 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26853:
-

 Summary: HeapStateBackend ignores metadata updates in certain cases
 Key: FLINK-26853
 URL: https://issues.apache.org/jira/browse/FLINK-26853
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.4, 1.15.0, 1.16.0
Reporter: Roman Khachatryan


On recovery, HeapRestoreOperation reads state handles one by one;
 * each handle contains metadata at the beginning;
 * the metadata is always read, but not actually used if a state with the 
corresponding name was already registered

In a rare case of downscaling + multiple checkpoints with different metadata; 
this might lead to data being deserialized incorrectly (always using the 
initial metadata).

It also prevents incremental checkpoints with schema evolution.

On first access, however, the backend itself will update (merge) metadata; so 
that it doesn't affect new state updates.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26685) Investigate/improve tests stability when using InMemory Changelog implementation

2022-03-16 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26685:
-

 Summary: Investigate/improve tests stability when using InMemory 
Changelog implementation
 Key: FLINK-26685
 URL: https://issues.apache.org/jira/browse/FLINK-26685
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends, Test Infrastructure
Affects Versions: 1.15.0, 1.16.0
Reporter: Roman Khachatryan
 Fix For: 1.16.0


Large scale tests often fail when using InMemory Changelog, either because of 
excessive GC pressure, serialization, or exceeding memory limits.

So far, it was fixed by using FS implementation on a case-by-case basis. Always 
using FS isn't straightforward.

Investigate if there are any ways to uniformly use FS impl. or stabilize 
in-memory impl.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26636) EmbeddedMultiThreadDependencyTests test_add_python_file failed on azure

2022-03-14 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26636:
-

 Summary: EmbeddedMultiThreadDependencyTests test_add_python_file 
failed on azure
 Key: FLINK-26636
 URL: https://issues.apache.org/jira/browse/FLINK-26636
 Project: Flink
  Issue Type: Bug
Reporter: Roman Khachatryan


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32981=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28024

{code}

2022-03-14T04:11:09.7985367Z Mar 14 04:11:09 
=== FAILURES ===
2022-03-14T04:11:09.7988965Z Mar 14 04:11:09 ___ 
EmbeddedMultiThreadDependencyTests.test_add_python_file 
2022-03-14T04:11:09.7989557Z Mar 14 04:11:09 
2022-03-14T04:11:09.7990098Z Mar 14 04:11:09 self = 

2022-03-14T04:11:09.7990644Z Mar 14 04:11:09 
2022-03-14T04:11:09.7991165Z Mar 14 04:11:09 def test_add_python_file(self):
2022-03-14T04:11:09.7991785Z Mar 14 04:11:09 python_file_dir = 
os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
2022-03-14T04:11:09.7992318Z Mar 14 04:11:09 os.mkdir(python_file_dir)
2022-03-14T04:11:09.7993113Z Mar 14 04:11:09 python_file_path = 
os.path.join(python_file_dir, "test_dependency_manage_lib.py")
2022-03-14T04:11:09.7994443Z Mar 14 04:11:09 with 
open(python_file_path, 'w') as f:
2022-03-14T04:11:09.7995251Z Mar 14 04:11:09 f.write("def 
add_two(a):\nraise Exception('This function should not be called!')")
2022-03-14T04:11:09.7997796Z Mar 14 04:11:09 
self.t_env.add_python_file(python_file_path)
2022-03-14T04:11:09.7998592Z Mar 14 04:11:09 
2022-03-14T04:11:09.7999298Z Mar 14 04:11:09 
python_file_dir_with_higher_priority = os.path.join(
2022-03-14T04:11:09.8000146Z Mar 14 04:11:09 self.tempdir, 
"python_file_dir_" + str(uuid.uuid4()))
2022-03-14T04:11:09.8001027Z Mar 14 04:11:09 
os.mkdir(python_file_dir_with_higher_priority)
2022-03-14T04:11:09.8001891Z Mar 14 04:11:09 
python_file_path_higher_priority = 
os.path.join(python_file_dir_with_higher_priority,
2022-03-14T04:11:09.8002526Z Mar 14 04:11:09
 "test_dependency_manage_lib.py")
2022-03-14T04:11:09.8003945Z Mar 14 04:11:09 with 
open(python_file_path_higher_priority, 'w') as f:
2022-03-14T04:11:09.8004664Z Mar 14 04:11:09 f.write("def 
add_two(a):\nreturn a + 2")
2022-03-14T04:11:09.8005208Z Mar 14 04:11:09 
self.t_env.add_python_file(python_file_path_higher_priority)
2022-03-14T04:11:09.8005648Z Mar 14 04:11:09 
2022-03-14T04:11:09.8005992Z Mar 14 04:11:09 def plus_two(i):
2022-03-14T04:11:09.8006430Z Mar 14 04:11:09 from 
test_dependency_manage_lib import add_two
2022-03-14T04:11:09.8006857Z Mar 14 04:11:09 return add_two(i)
2022-03-14T04:11:09.8007204Z Mar 14 04:11:09 
2022-03-14T04:11:09.8007605Z Mar 14 04:11:09 
self.t_env.create_temporary_system_function(
2022-03-14T04:11:09.8008206Z Mar 14 04:11:09 "add_two", 
udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
2022-03-14T04:11:09.8008742Z Mar 14 04:11:09 table_sink = 
source_sink_utils.TestAppendSink(
2022-03-14T04:11:09.8009465Z Mar 14 04:11:09 ['a', 'b'], 
[DataTypes.BIGINT(), DataTypes.BIGINT()])
2022-03-14T04:11:09.8009993Z Mar 14 04:11:09 
self.t_env.register_table_sink("Results", table_sink)
2022-03-14T04:11:09.8010763Z Mar 14 04:11:09 t = 
self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
2022-03-14T04:11:09.8011324Z Mar 14 04:11:09 >   
t.select(expr.call("add_two", t.a), t.a).execute_insert("Results").wait()
2022-03-14T04:11:09.8011779Z Mar 14 04:11:09 
2022-03-14T04:11:09.8012176Z Mar 14 04:11:09 
pyflink/table/tests/test_dependency.py:63: 
2022-03-14T04:11:09.8012677Z Mar 14 04:11:09 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2022-03-14T04:11:09.8013574Z Mar 14 04:11:09 pyflink/table/table_result.py:76: 
in wait
2022-03-14T04:11:09.8014041Z Mar 14 04:11:09 
get_method(self._j_table_result, "await")()
2022-03-14T04:11:09.8014843Z Mar 14 04:11:09 
.tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in 
__call__
2022-03-14T04:11:09.8015387Z Mar 14 04:11:09 return_value = 
get_return_value(
2022-03-14T04:11:09.8015843Z Mar 14 04:11:09 pyflink/util/exceptions.py:146: in 
deco
2022-03-14T04:11:09.8016267Z Mar 14 04:11:09 return f(*a, **kw)
2022-03-14T04:11:09.8016720Z Mar 14 04:11:09 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2022-03-14T04:11:09.8017116Z Mar 14 04:11:09 
2022-03-14T04:11:09.8017626Z Mar 14 04:11:09 answer = 'xro2722'
2022-03-14T04:11:09.8018127Z Mar 14 04:11:09 gateway_client = 

2022-03-14T04:11:09.8018812Z Mar 14 04:11:09 

[jira] [Created] (FLINK-26635) KafkaSourceE2ECase.testMultipleSplits failed on azure with Correlation id for response does not match request

2022-03-14 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26635:
-

 Summary: KafkaSourceE2ECase.testMultipleSplits failed on azure 
with Correlation id for response does not match request
 Key: FLINK-26635
 URL: https://issues.apache.org/jira/browse/FLINK-26635
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Roman Khachatryan


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32981=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=14703

{code}
2022-03-14T03:35:34.8710354Z Mar 14 03:35:34 [ERROR] Tests run: 16, Failures: 
0, Errors: 1, Skipped: 0, Time elapsed: 184.49 s <<< FAILURE! - in 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase
2022-03-14T03:35:34.8881150Z Mar 14 03:35:34 [ERROR] 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testMultipleSplits(TestEnvironment,
 DataStreamSourceExternalContext, CheckpointingMode)[1]  Time elapsed: 3.769 s  
<<< ERROR!
2022-03-14T03:35:34.8882315Z Mar 14 03:35:34 java.lang.RuntimeException: Failed 
to fetch next result
2022-03-14T03:35:34.8886537Z Mar 14 03:35:34at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
2022-03-14T03:35:34.8887677Z Mar 14 03:35:34at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
2022-03-14T03:35:34.672Z Mar 14 03:35:34at 
org.apache.flink.connector.testframe.utils.CollectIteratorAssert.compareWithExactlyOnceSemantic(CollectIteratorAssert.java:116)
2022-03-14T03:35:34.8889715Z Mar 14 03:35:34at 
org.apache.flink.connector.testframe.utils.CollectIteratorAssert.matchesRecordsFromSource(CollectIteratorAssert.java:71)
2022-03-14T03:35:34.8890700Z Mar 14 03:35:34at 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.checkResultWithSemantic(SourceTestSuiteBase.java:769)
2022-03-14T03:35:34.8891663Z Mar 14 03:35:34at 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.testMultipleSplits(SourceTestSuiteBase.java:218)
2022-03-14T03:35:34.8892432Z Mar 14 03:35:34at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-03-14T03:35:34.8893159Z Mar 14 03:35:34at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-03-14T03:35:34.8893940Z Mar 14 03:35:34at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-03-14T03:35:34.8894691Z Mar 14 03:35:34at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-03-14T03:35:34.8895694Z Mar 14 03:35:34at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
2022-03-14T03:35:34.8896532Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
2022-03-14T03:35:34.8897443Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-03-14T03:35:34.8898353Z Mar 14 03:35:34at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
2022-03-14T03:35:34.8899200Z Mar 14 03:35:34at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
2022-03-14T03:35:34.8901355Z Mar 14 03:35:34at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
2022-03-14T03:35:34.8902346Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
2022-03-14T03:35:34.8903536Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
2022-03-14T03:35:34.8904460Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
2022-03-14T03:35:34.8905743Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
2022-03-14T03:35:34.8906674Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
2022-03-14T03:35:34.8907606Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
2022-03-14T03:35:34.8908447Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
2022-03-14T03:35:34.8909283Z Mar 14 03:35:34at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
2022-03-14T03:35:34.8910366Z Mar 14 03:35:34at 

[jira] [Created] (FLINK-26632) JobManagerHAProcessFailureRecoveryITCase failed due to JVM exits with code 239

2022-03-14 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26632:
-

 Summary: JobManagerHAProcessFailureRecoveryITCase failed due to 
JVM exits with code 239
 Key: FLINK-26632
 URL: https://issues.apache.org/jira/browse/FLINK-26632
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.15.0
Reporter: Roman Khachatryan


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32979=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=5486]

 {code}
 Mar 14 02:32:25 [ERROR] Error occurred in starting fork, check output in log
 Mar 14 02:32:25 [ERROR] Process Exit Code: 239
 Mar 14 02:32:25 [ERROR] Crashed tests:
 Mar 14 02:32:25 [ERROR] 
org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase
 Mar 14 02:32:25 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669)
 Mar 14 02:32:25 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.access$600(ForkStarter.java:115)
 Mar 14 02:32:25 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter$2.call(ForkStarter.java:444)
 Mar 14 02:32:25 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter$2.call(ForkStarter.java:420)
 Mar 14 02:32:25 [ERROR] at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
 Mar 14 02:32:25 [ERROR] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 Mar 14 02:32:25 [ERROR] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 Mar 14 02:32:25 [ERROR] at java.lang.Thread.run(Thread.java:748)
 Mar 14 02:32:25 [ERROR] -> [Help 1]

 {code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26615) BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout fails on azure

2022-03-11 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26615:
-

 Summary: BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout 
fails on azure
 Key: FLINK-26615
 URL: https://issues.apache.org/jira/browse/FLINK-26615
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Tests
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32896=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24724]

{code}
[ERROR] Tests run: 10, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.103 
s <<< FAILURE! - in 
org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest
[ERROR] 
org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout
  Time elapsed: 0.042 s  <<< FAILURE!
java.lang.AssertionError: expected:<[0]> but was:<[]>
   at org.junit.Assert.fail(Assert.java:89)
   at org.junit.Assert.failNotEquals(Assert.java:835)
   at org.junit.Assert.assertEquals(Assert.java:120)
   at org.junit.Assert.assertEquals(Assert.java:146)
   at 
org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout(BatchingStateChangeUploadSchedulerTest.java:240)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
   at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
   at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
   at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
   at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
   at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
   at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrato
 r.java:107)
   at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrato
 r.java:88)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26592) [Changelog] Deadlock in FsStateChangelogWriter

2022-03-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26592:
-

 Summary: [Changelog] Deadlock in FsStateChangelogWriter
 Key: FLINK-26592
 URL: https://issues.apache.org/jira/browse/FLINK-26592
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0


The issue occurs when sizes of buffers are set to minimum (e.g. 1 byte).
Task thread tries to update state -> schedules to upload changes -> waits for 
capacity.
Upload threads do release capacity on upload completion; however, they are 
unable to send back the results because the Writer lock is taken; therefore, 
they're unable to proceed with the next uploads.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26591) Compilation fails due to RawToBinaryCastRule

2022-03-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26591:
-

 Summary: Compilation fails due to RawToBinaryCastRule
 Key: FLINK-26591
 URL: https://issues.apache.org/jira/browse/FLINK-26591
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32857=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=6288

{code}
2022-03-10T15:15:36.8803533Z [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) 
on proj*ect flink-table-planner_2.12: Compilation failure
2022-03-10T15:15:36.8805068Z [ERROR] 
/__w/3/s/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawTo
 BinaryCastRule.java:[46,5] method does not override or implement a method from 
a supertype
{code}

cc: [~slinkydeveloper]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26590) Triggered checkpoints can be delayed by discarding shared state

2022-03-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26590:
-

 Summary: Triggered checkpoints can be delayed by discarding shared 
state
 Key: FLINK-26590
 URL: https://issues.apache.org/jira/browse/FLINK-26590
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0, 1.14.3
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.16.0


Quick note: CheckpointCleaner is not involved here.

When a checkpoint is subsumed, SharedStateRegistry schedules its unused shared 
state for async deletion. It uses common IO pool for this and adds a Runnable 
per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)

When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
to initialize the location for it. (see 
CheckpointCoordinator.initializeCheckpoint)

The thread pool is of fixed size 
[jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
 by default it's the number of CPU cores) and uses FIFO queue for tasks.

When there is a spike in state deletion, the next checkpoint is delayed waiting 
for an available IO thread.

Back-pressure seems reasonable here (similar to CheckpointCleaner); however, 
this shared state deletion could be spread across multiple subsequent 
checkpoints, not neccesarily the next one.

 

I believe the issue is an pre-existing one; but it particularly affects 
changelog state backend, because 1) such spikes are likely there; 2) workloads 
are latency sensitive.

In the tests, checkpoint duration grows from seconds to minutes immediately 
after the materialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26485) [Changelog] State not discarded after multiple retries

2022-03-03 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26485:
-

 Summary: [Changelog] State not discarded after multiple retries
 Key: FLINK-26485
 URL: https://issues.apache.org/jira/browse/FLINK-26485
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26455) [Changelog] Materialization interleaved with task cancellation can fail the job

2022-03-02 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26455:
-

 Summary: [Changelog] Materialization interleaved with task 
cancellation can fail the job
 Key: FLINK-26455
 URL: https://issues.apache.org/jira/browse/FLINK-26455
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26396) [Changelog] Upload is not failed even if all attempts timeout

2022-02-28 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26396:
-

 Summary: [Changelog] Upload is not failed even if all attempts 
timeout
 Key: FLINK-26396
 URL: https://issues.apache.org/jira/browse/FLINK-26396
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26372) Allow Changelog Storage configuration per program

2022-02-25 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26372:
-

 Summary: Allow Changelog Storage configuration per program
 Key: FLINK-26372
 URL: https://issues.apache.org/jira/browse/FLINK-26372
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration, Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.16.0


It's currently possible to override state.backend.changelog.enabled per job, 
but it's not possible to override Changelog storage (i.e. writer) settings.

There should be 1) an API and 2) runtime support.

See this 
[discussion|https://github.com/apache/flink/pull/16341#discussion_r663749681] 
and the corresponding 
[TODO|https://github.com/apache/flink/pull/16341/files#diff-2c21555dcab689ec27c0ab981852a2bfa787695fb2fe04b24c22b89c63d98b73R680].
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26306) Triggered checkpoints can be delayed by discarding shared state

2022-02-22 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26306:
-

 Summary: Triggered checkpoints can be delayed by discarding shared 
state
 Key: FLINK-26306
 URL: https://issues.apache.org/jira/browse/FLINK-26306
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.3, 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


Quick note: CheckpointCleaner is not involved here.

When a checkpoint is subsumed, SharedStateRegistry schedules its unused shared 
state for async deletion. It uses common IO pool for this and adds a Runnable 
per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)

When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
to initialize the location for it. (see 
CheckpointCoordinator.initializeCheckpoint)

The thread pool is of fixed size 
[jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
 by default it's the number of CPU cores) and uses FIFO queue for tasks.

When there is a spike in state deletion, the next checkpoint is delayed waiting 
for an available IO thread.

I believe the issue is an old one.
But it particularly affects changelog state backend, because 1) such spikes are 
likely; 2) workloads are latency sensitive.





--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26255) SplitAggregateITCase.testAggWithJoin failed on azure

2022-02-18 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26255:
-

 Summary: SplitAggregateITCase.testAggWithJoin failed on azure
 Key: FLINK-26255
 URL: https://issues.apache.org/jira/browse/FLINK-26255
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31850=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=10497]

 
{code:java}
[ERROR] Tests run: 64, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
700.545 s <<< FAILURE! - in 
org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase
[ERROR] SplitAggregateITCase.testAggWithJoin  Time elapsed: 601.77 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
   at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniCl
 usterJobClient.java:141)
   at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
   at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
   at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandle
 r.java:259)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
   at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java
 :93)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadi
 ngUtils.java:68)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextCla
 ssLoader$2(ClassLoadingUtils.java:92)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
   at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
   at akka.dispatch.OnComplete.internal(Future.scala:300)
   at akka.dispatch.OnComplete.internal(Future.scala:297)
   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
   at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFut
 ureUtils.java:65)
   at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
   at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
   at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
   at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26231) [Changelog] Incorrect MaterializationID passed to ChangelogStateBackendHandleImpl

2022-02-17 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26231:
-

 Summary: [Changelog] Incorrect MaterializationID passed to 
ChangelogStateBackendHandleImpl
 Key: FLINK-26231
 URL: https://issues.apache.org/jira/browse/FLINK-26231
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


In ChangelogStateBackendHandleImpl constructor, materializationID and 
persistedSizeOfThisCheckpoint are mixed up.

 

cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26198) ArchitectureTest fails on AZP (table.api.StatementSet)

2022-02-16 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26198:
-

 Summary: ArchitectureTest fails on AZP (table.api.StatementSet)
 Key: FLINK-26198
 URL: https://issues.apache.org/jira/browse/FLINK-26198
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Roman Khachatryan


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31681=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=26849

{code}
[INFO] Running org.apache.flink.architecture.rules.ApiAnnotationRules
[ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 48.583 
s <<< FAILURE! - in org.apache.flink.architecture.rules.ApiAnnotationRules
[ERROR] 
ApiAnnotationRules.PUBLIC_EVOLVING_API_METHODS_USE_ONLY_PUBLIC_EVOLVING_API_TYPES
  Time elapsed: 0.282 s  <<< FAILURE!
java.lang.AssertionError: 
Architecture Violation [Priority: MEDIUM] - Rule 'Return and argument types of 
methods annotated with @PublicEvolving must be annotated with 
@Public(Evolving).' was violated (1 times):
org.apache.flink.table.api.StatementSet.compilePlan(): Returned leaf type 
org.apache.flink.table.api.CompiledPlan does not satisfy: reside outside of 
package 'org.apache.flink..' or annotated with @Public or annotated with 
@PublicEvolving or annotated with @Deprecated
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26165) SavepointFormatITCase fails on azure

2022-02-15 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26165:
-

 Summary: SavepointFormatITCase fails on azure
 Key: FLINK-26165
 URL: https://issues.apache.org/jira/browse/FLINK-26165
 Project: Flink
  Issue Type: Bug
Reporter: Roman Khachatryan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26154) SavepointFormatITCase fails on azure

2022-02-15 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26154:
-

 Summary: SavepointFormatITCase fails on azure
 Key: FLINK-26154
 URL: https://issues.apache.org/jira/browse/FLINK-26154
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Tests
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


Originally reported in FLINK-26144.

 

[https://dev.azure.com/mapohl/flink/_build/results?buildId=738=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=9a028d19-6c4b-5a4e-d378-03fca149d0b1=6340]

 
{code}
Feb 15 01:26:50 [ERROR] Tests run: 16, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 591.027 s <<< FAILURE! - in 
org.apache.flink.test.checkpointing.SavepointFormatITCase
Feb 15 01:26:50 [ERROR] 
org.apache.flink.test.checkpointing.SavepointFormatITCase.testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(SavepointFormatType,
 StateBackendConfig)[2]  Time elapsed: 261.901 s  <<< ERROR!
Feb 15 01:26:50 java.util.concurrent.TimeoutException: Condition was not met in 
given timeout.
Feb 15 01:26:50 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:166)
Feb 15 01:26:50 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:144)
Feb 15 01:26:50 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:136)
Feb 15 01:26:50 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:210)
Feb 15 01:26:50 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:184)
Feb 15 01:26:50 at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:172)
Feb 15 01:26:50 at 
org.apache.flink.test.checkpointing.SavepointFormatITCase.relocateAndVerify(SavepointFormatITCase.java:306)
Feb 15 01:26:50 at 
org.apache.flink.test.checkpointing.SavepointFormatITCase.testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(SavepointFormatITCase.java:260)
Feb 15 01:26:50 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Feb 15 01:26:50 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Feb 15 01:26:50 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Feb 15 01:26:50 at java.lang.reflect.Method.invoke(Method.java:498)
Feb 15 01:26:50 at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
Feb 15 01:26:50 at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26144) SavepointFormatITCase did NOT fail with changelog.enabled randomized

2022-02-15 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26144:
-

 Summary: SavepointFormatITCase did NOT fail with changelog.enabled 
randomized 
 Key: FLINK-26144
 URL: https://issues.apache.org/jira/browse/FLINK-26144
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


The test vaildates corrects types of state handles created by savepoint. For 
NATIVE savepoints, it expects IncrementalRemoteKeyedStateHandle and 
KeyGroupsStateHandle.

However, with changelog those will be wrapped into ChangelogStateBackendHandle 
and the test fails.

It can be refactored to account for changelog.

cc: [~dwysakowicz],

Another issue is that it does NOT fail on master where 
checkpointing.changelog=random in pom.xml which should call
{code:java}
randomize(conf, ENABLE_STATE_CHANGE_LOG, true, false);{code}
 
If I leave only a single "true" option to randomize(), it does fail. If 
confirmed, I'll open a new ticket for that. cc: [~arvid] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26092) JsonAggregationFunctionsITCase fails with NPE when using RocksDB

2022-02-11 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26092:
-

 Summary: JsonAggregationFunctionsITCase fails with NPE when using 
RocksDB
 Key: FLINK-26092
 URL: https://issues.apache.org/jira/browse/FLINK-26092
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Tests
Affects Versions: 1.14.3, 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


Whith RocksDB backend chosen manually (instead of Heap; e.g. by altering 
mini-cluster configuration in BuiltInAggregateFunctionTestBase);
the test fails with NPE.
 
Not sure whether it's a RocksDB issue, a test issue, or not an issue at all.
The current Changelog backend behavior mimics RocksDB, and therefore enabling 
it with materialization fails the test too (Changelog +  Heap).
 
{code}
java.lang.RuntimeException: Could not collect results
    at 
org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.materializeResult(BuiltInAggregateFunctionTestBase.java:169)
    at 
org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.assertRows(BuiltInAggregateFunctionTestBase.java:133)
    at 
org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase$SuccessItem.execute(BuiltInAggregateFunctionTestBase.java:279)
    at 
org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.testFunction(BuiltInAggregateFunctionTestBase.java:93)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runners.Suite.runChild(Suite.java:128)
    at org.junit.runners.Suite.runChild(Suite.java:27)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.RuntimeException: Failed to fetch next result
    at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
    at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
    at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:216)
    at java.util.Iterator.forEachRemaining(Iterator.java:115)
    at 
org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.materializeResult(BuiltInAggregateFunctionTestBase.java:150)
    ... 38 more
Caused by: java.io.IOException: Failed to fetch job execution result
    at 

[jira] [Created] (FLINK-26079) Disallow combination of Changelog backend and CLAIM restore mode

2022-02-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26079:
-

 Summary: Disallow combination of Changelog backend and CLAIM 
restore mode 
 Key: FLINK-26079
 URL: https://issues.apache.org/jira/browse/FLINK-26079
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration, Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.15.0


Extracted from FLINK-25872.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26063) [Changelog] Incorrect key group logged for PQ.poll and remove

2022-02-09 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26063:
-

 Summary: [Changelog] Incorrect key group logged for PQ.poll and 
remove
 Key: FLINK-26063
 URL: https://issues.apache.org/jira/browse/FLINK-26063
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


Key group is logged so that state changes can be re-distributed or shuffled.
It is currently obtained from keyContext during poll() and remove() operations.
However, keyContext is not updated when dequeing processing time timers.

The impact is relatively small for remove(): in the worst case, the operation 
will be ignored.
poll() should probably be replaced with remove() anyways - see FLINK-26062.

One way to solve this problem is to extract key group from the polled element - 
if it is a timer.

cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26062) [Changelog] Non-deterministic recovery of PriorityQueue states

2022-02-09 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26062:
-

 Summary: [Changelog] Non-deterministic recovery of PriorityQueue 
states
 Key: FLINK-26062
 URL: https://issues.apache.org/jira/browse/FLINK-26062
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


Currently, InternalPriorityQueue.poll() is logged as a separate operation, 
without specifying the element that has been polled. On recovery, this recorded 
poll() is replayed.

However, this is not deterministic because the order of PQ elements with equal 
priorityis not specified. For example, TimerHeapInternalTimer only compares 
timestamps, which are often equal. This results in polling timers from queue in 
wrong order => dropping timers => and not firing timers.

 

ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow
 fails with materialization enabled and using heap state backend (both 
in-memory and fs-based implementations).

 

Proposed solution is to replace poll with remove operation (which is based on 
equality).
 
cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26019) Changelogged PriorityQueue elements recovered out-of-order

2022-02-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26019:
-

 Summary: Changelogged PriorityQueue elements recovered out-of-order
 Key: FLINK-26019
 URL: https://issues.apache.org/jira/browse/FLINK-26019
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


StateChangeFormat is the class responsible for writing out changelog data.
Each chunk of data is sorted by: logId -> sequenceNumber -> keyGroup.
Sorting by sequenceNumber preserves temporal order.
Sorting by keyGroup a) puts metadata (group -1) at the beginning and b) allows 
to write KG only once.

However, the assumption that the order of changes across groups currently 
doesn't hold: poll operation of InternalPriorityQueue may affect any group (the 
smaller item across groups so far will be polled).

This results in wrong processing time timers being removed on recovery in 
ProcessingTimeWindowCheckpointingITCase#testAggregatingSlidingProcessingTimeWindow

One way to solve this probelm is to simply disable KG-sorting and grouping 
(only output metadata at the beginning). 
The other one is to associate polled element with the correct key group while 
logging changes.

Both ways should work with re-scaling.

cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25992) JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership fails on azure

2022-02-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25992:
-

 Summary: 
JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership 
fails on azure
 Key: FLINK-25992
 URL: https://issues.apache.org/jira/browse/FLINK-25992
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9154

{code}
19:41:35,515 [flink-akka.actor.default-dispatcher-9] WARN  
org.apache.flink.runtime.taskmanager.Task[] - jobVertex 
(1/1)#0 (7efdea21f5f95490e02117063ce8a314) switched from RUNNING to FAILED with 
failure cause: java.lang.RuntimeException: Error while notify checkpoint ABORT.
at 
org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1457)
at 
org.apache.flink.runtime.taskmanager.Task.notifyCheckpointAborted(Task.java:1407)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.abortCheckpoint(TaskExecutor.java:1021)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.UnsupportedOperationException: notifyCheckpointAbortAsync 
not supported by 
org.apache.flink.runtime.dispatcher.JobDispatcherITCase$AtLeastOneCheckpointInvokable
at 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable.notifyCheckpointAbortAsync(AbstractInvokable.java:205)
at 
org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1430)
... 31 more

{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25987) IllegalArgumentException thrown from FsStateChangelogWriter.truncate

2022-02-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25987:
-

 Summary: IllegalArgumentException thrown from 
FsStateChangelogWriter.truncate
 Key: FLINK-25987
 URL: https://issues.apache.org/jira/browse/FLINK-25987
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


{code}
java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.truncate(FsStateChangelogWriter.java:278)
at 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.updateChangelogSnapshotState(ChangelogKeyedStateBackend.java:702)
at 
org.apache.flink.state.changelog.PeriodicMaterializationManager.lambda$null$2(PeriodicMaterializationManager.java:163)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750){code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25871) "License Check failed" on AZP

2022-01-28 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25871:
-

 Summary: "License Check failed" on AZP
 Key: FLINK-25871
 URL: https://issues.apache.org/jira/browse/FLINK-25871
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30405=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=25983]

 
{code}
2022-01-28T16:00:43.2845037Z Invoking mvn with 'mvn 
-Dmaven.repo.local=/__w/1/.m2/repository -Dmaven.wagon.http.pool=false 
-Dorg.slf4j.simpleLogger.showDateTime=true 
-Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
 --no-snapshot-updates -B -Dhadoop.version=2.8.5 -Dinclude_hadoop_aws 
-Dscala-2.12  --settings /__w/1/s/tools/ci/google-mirror-settings.xml   
exec:java 
-Dexec.mainClass=org.apache.flink.tools.ci.licensecheck.LicenseChecker 
-Dexec.args="/tmp/clean_compile.out /__w/1/s /tmp/flink-validation-deployment"'
2022-01-28T16:00:44.2455332Z [INFO] Scanning for projects...
2022-01-28T16:00:45.4125838Z [INFO] 

2022-01-28T16:00:45.4127042Z [INFO] 

2022-01-28T16:00:45.4134357Z [INFO] Building Flink : Tools : CI : Java 
1.15-SNAPSHOT
2022-01-28T16:00:45.4135024Z [INFO] 

2022-01-28T16:00:45.6475421Z [INFO] 
2022-01-28T16:00:45.6482266Z [INFO] --- exec-maven-plugin:3.0.0:java 
(default-cli) @ java-ci-tools ---
2022-01-28T16:00:46.4578710Z 16:00:46,454 WARN  
org.apache.flink.tools.ci.licensecheck.LicenseChecker[] - THIS UTILITY 
IS ONLY CHECKING FOR COMMON LICENSING MISTAKES. A MANUAL CHECK OF THE NOTICE 
FILES, DEPLOYED ARTIFACTS, ETC. IS STILL NEEDED!
2022-01-28T16:00:46.4654764Z 16:00:46,464 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Loaded 21 
items from resource modules-skipping-deployment.modulelist
2022-01-28T16:00:46.4664700Z 16:00:46,465 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Loaded 7 
items from resource modules-defining-excess-dependencies.modulelist
2022-01-28T16:00:50.3720798Z 16:00:50,370 INFO  
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Extracted 45 
modules with a total of 649 dependencies
2022-01-28T16:00:51.8055495Z 16:00:51,804 INFO  
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Found 43 
NOTICE files to check
2022-01-28T16:00:51.9879879Z 16:00:51,984 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 
org.jodd:jodd-core:3.5.2 is mentioned in NOTICE file 
/__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE,
 but was not mentioned by the build output as a bundled dependency
2022-01-28T16:00:51.9886753Z 16:00:51,985 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 
org.apache.hive:hive-storage-api:2.7.0 is mentioned in NOTICE file 
/__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE,
 but was not mentioned by the build output as a bundled dependency
2022-01-28T16:00:51.9913405Z 16:00:51,985 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 
org.objenesis:objenesis:2.1 is mentioned in NOTICE file 
/__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE,
 but was not mentioned by the build output as a bundled dependency
2022-01-28T16:00:51.9929357Z 16:00:51,986 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 
org.apache.hive.shims:hive-shims-common:3.1.2 is mentioned in NOTICE file 
/__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE,
 but was not mentioned by the build output as a bundled dependency
2022-01-28T16:00:51.9932281Z 16:00:51,986 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 
org.apache.hive:hive-common:3.1.2 is mentioned in NOTICE file 
/__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE,
 but was not mentioned by the build output as a bundled dependency
2022-01-28T16:00:51.9934561Z 16:00:51,986 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 
org.apache.hive:hive-serde:3.1.2 is mentioned in NOTICE file 
/__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE,
 but was not mentioned by the build output as a bundled dependency
2022-01-28T16:00:51.9936752Z 16:00:51,986 DEBUG 
org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency 

[jira] [Created] (FLINK-25867) [ZH] Add ChangelogBackend documentation

2022-01-28 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25867:
-

 Summary: [ZH] Add ChangelogBackend documentation
 Key: FLINK-25867
 URL: https://issues.apache.org/jira/browse/FLINK-25867
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


Currently, changelog backend is hidden from users documentation-wise.

Once the feature is ready, the following needs to be documented:
 * General description (page 
[https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/]
 )
 * Configuration (page 
[https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/] 
- StateChangelogOptions, FsStateChangelogOptions)
 * Uploader metrics (page 
[https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/] , see 
FLINK-23486)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25850) Consider notifying nested state backend about checkpoint abortion

2022-01-27 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25850:
-

 Summary: Consider notifying nested state backend about checkpoint 
abortion
 Key: FLINK-25850
 URL: https://issues.apache.org/jira/browse/FLINK-25850
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0


The notification is optional, but some backends might do GC upon receiving it.
{code}
These notifications are "best effort", meaning they can sometimes be skipped.
This method is very rarely necessary to implement.
{code}

The usefulness is also limited by:
- low probability of notification reaching backend because of the difference in 
intervals and cleanup on checkpoint completion
- low probability of backends making good use of it because it's delivered 
after snapshot is done; and backends must be resilient to missing notifications

There is added complexity and risk (such as FLINK-25816).
Probably, complexity can be eliminated by extracting some Notifier class from 
ChangelogStateBackend.

cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25842) [v2] FLIP-158: Generalized incremental checkpoints

2022-01-27 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25842:
-

 Summary: [v2] FLIP-158: Generalized incremental checkpoints
 Key: FLINK-25842
 URL: https://issues.apache.org/jira/browse/FLINK-25842
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.16.0


Umbrella ticket for the 2nd iteration of [FLIP-158: Generalized incremental 
checkpoints|https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints]
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25825) MySqlCatalogITCase fails on azure

2022-01-26 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25825:
-

 Summary: MySqlCatalogITCase fails on azure
 Key: FLINK-25825
 URL: https://issues.apache.org/jira/browse/FLINK-25825
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30189=logs=e9af9cde-9a65-5281-a58e-2c8511d36983=c520d2c3-4d17-51f1-813b-4b0b74a0c307=13677
 
{code}
2022-01-26T06:04:42.8019913Z Jan 26 06:04:42 [ERROR] 
org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase.testFullPath  Time 
elapsed: 2.166 *s  <<< FAILURE!
2022-01-26T06:04:42.8025522Z Jan 26 06:04:42 java.lang.AssertionError: 
expected: java.util.ArrayList<[+I[1, -1, 1, null, true, null, hello, 2021-0 
8-04, 2021-08-04T01:54:16, -1, 1, -1.0, 1.0, enum2, -9.1, 9.1, -1, 1, -1, 1, 
\{"k1": "v1"}, null, col_longtext, null, -1, 1, col_mediumtext, -99, 9 9, -1.0, 
1.0, set_ele1, -1, 1, col_text, 10:32:34, 2021-08-04T01:54:16, col_tinytext, 
-1, 1, null, col_varchar, 2021-08-04T01:54:16.463, 09:33:43,  
2021-08-04T01:54:16.463, null], +I[2, -1, 1, null, true, null, hello, 
2021-08-04, 2021-08-04T01:53:19, -1, 1, -1.0, 1.0, enum2, -9.1, 9.1, -1, 1,  
-1, 1, \{"k1": "v1"}, null, col_longtext, null, -1, 1, col_mediumtext, -99, 99, 
-1.0, 1.0, set_ele1,set_ele12, -1, 1, col_text, 10:32:34, 2021-08- 04T01:53:19, 
col_tinytext, -1, 1, null, col_varchar, 2021-08-04T01:53:19.098, 09:33:43, 
2021-08-04T01:53:19.098, null]]> but was: java.util.ArrayL ist<[+I[1, -1, 1, 
null, true, null, hello, 2021-08-04, 2021-08-04T01:54:16, -1, 1, -1.0, 1.0, 
enum2, -9.1, 9.1, -1, 1, -1, 1, \{"k1": "v1"}, null,  col_longtext, null, -1, 
1, col_mediumtext, -99, 99, -1.0, 1.0, set_ele1, -1, 1, col_text, 10:32:34, 
2021-08-04T01:54:16, col_tinytext, -1, 1, null , col_varchar, 
2021-08-04T01:54:16.463, 09:33:43, 2021-08-04T01:54:16.463, null], +I[2, -1, 1, 
null, true, null, hello, 2021-08-04, 2021-08-04T01: 53:19, -1, 1, -1.0, 1.0, 
enum2, -9.1, 9.1, -1, 1, -1, 1, \{"k1": "v1"}, null, col_longtext, null, -1, 1, 
col_mediumtext, -99, 99, -1.0, 1.0, set_el e1,set_ele12, -1, 1, col_text, 
10:32:34, 2021-08-04T01:53:19, col_tinytext, -1, 1, null, col_varchar, 
2021-08-04T01:53:19.098, 09:33:43, 2021-08-0 4T01:53:19.098, null]]>
2022-01-26T06:04:42.8029336Z Jan 26 06:04:42    at 
org.junit.Assert.fail(Assert.java:89)
2022-01-26T06:04:42.8029824Z Jan 26 06:04:42    at 
org.junit.Assert.failNotEquals(Assert.java:835)
2022-01-26T06:04:42.8030319Z Jan 26 06:04:42    at 
org.junit.Assert.assertEquals(Assert.java:120)
2022-01-26T06:04:42.8030815Z Jan 26 06:04:42    at 
org.junit.Assert.assertEquals(Assert.java:146)
2022-01-26T06:04:42.8031419Z Jan 26 06:04:42    at 
org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase.testFullPath(MySqlCatalogITCase.java*:306)
{code}
 
{code}
2022-01-26T06:04:43.2899378Z Jan 26 06:04:43 [ERROR] Failures:
2022-01-26T06:04:43.2907942Z Jan 26 06:04:43 [ERROR]   
MySqlCatalogITCase.testFullPath:306 expected: java.util.ArrayList<[+I[1, -1, 1, 
null, true,
2022-01-26T06:04:43.2914065Z Jan 26 06:04:43 [ERROR]   
MySqlCatalogITCase.testGetTable:253 expected:<(
2022-01-26T06:04:43.2983567Z Jan 26 06:04:43 [ERROR]   
MySqlCatalogITCase.testSelectToInsert:323 expected: java.util.ArrayList<[+I[1, 
-1, 1, null,
2022-01-26T06:04:43.2997373Z Jan 26 06:04:43 [ERROR]   
MySqlCatalogITCase.testWithoutCatalog:291 expected: java.util.ArrayList<[+I[1, 
-1, 1, null,
2022-01-26T06:04:43.3010450Z Jan 26 06:04:43 [ERROR]   
MySqlCatalogITCase.testWithoutCatalogDB:278 expected: 
java.util.ArrayList<[+I[1, -1, 1, nul
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25824) E2e test phase fails on AZP after "Unable to locate package moreutils"

2022-01-26 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25824:
-

 Summary: E2e test phase fails on AZP after "Unable to locate 
package moreutils"
 Key: FLINK-25824
 URL: https://issues.apache.org/jira/browse/FLINK-25824
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Reporter: Roman Khachatryan
 Fix For: 1.15.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30209=logs=bea52777-eaf8-5663-8482-18fbc3630e81=b2642e3a-5b86-574d-4c8a-f7e2842bfb14=17]

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30209=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=17]

 
{code:java}
E: Unable to locate package moreutils
Running command 'flink-end-to-end-tests/run-nightly-tests.sh 1' with a timeout 
of 287 minutes.
./tools/azure-pipelines/uploading_watchdog.sh: line 76: ts: command not found
/home/vsts/work/1/s/flink-end-to-end-tests/../tools/ci/maven-utils.sh: line 96: 
NPM_PROXY_PROFILE_ACTIVATION: command not found
The STDIO streams did not close within 10 seconds of the exit event from 
process '/usr/bin/bash'. This may indicate a child process inherited the STDIO 
streams and has not yet exited.
##[error]Bash exited with code '141'. {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25750) Performance regression on 20.01.2021 in globalWindow and stateBackend benchmarks

2022-01-21 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25750:
-

 Summary: Performance regression on 20.01.2021 in globalWindow and 
stateBackend benchmarks
 Key: FLINK-25750
 URL: https://issues.apache.org/jira/browse/FLINK-25750
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=globalWindow=2=200=off=on=on
http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=stateBackends.FS=2=200=off=on=on
http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=stateBackends.FS=2=200=off=on=on



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25740) PulsarSourceOrderedE2ECase fails on azure

2022-01-20 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25740:
-

 Summary: PulsarSourceOrderedE2ECase fails on azure
 Key: FLINK-25740
 URL: https://issues.apache.org/jira/browse/FLINK-25740
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.0
Reporter: Roman Khachatryan


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29789=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=16385

{code}
2022-01-20T15:39:52.3915823Z Jan 20 15:39:52 [ERROR] Errors:
2022-01-20T15:39:52.3922501Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testIdleReader:187->SourceTestSuiteBase.gene
 rateAndWriteTestData:315 » BrokerPersistence
2022-01-20T15:39:52.3924207Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testIdleReader:187->SourceTestSuiteBase.gene
 rateAndWriteTestData:315 » BrokerPersistence
2022-01-20T15:39:52.3925830Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testMultipleSplits:145->SourceTestSuiteBase.
 generateAndWriteTestData:315 » BrokerPersistence
2022-01-20T15:39:52.3927464Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testMultipleSplits:145->SourceTestSuiteBase.
 generateAndWriteTestData:315 » BrokerPersistence
2022-01-20T15:39:52.3928743Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testSourceSingleSplit:105->SourceTestSuiteBa
 se.generateAndWriteTestData:315 » BrokerPersistence
2022-01-20T15:39:52.3930029Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testSourceSingleSplit:105->SourceTestSuiteBa
 se.generateAndWriteTestData:315 » BrokerPersistence
2022-01-20T15:39:52.3931359Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testTaskManagerFailure:232 » 
BrokerPersisten ce
2022-01-20T15:39:52.3932353Z Jan 20 15:39:52 [ERROR]   
PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testTaskManagerFailure:232 » 
BrokerPersisten ce
2022-01-20T15:39:52.3933580Z Jan 20 15:39:52 [ERROR]   
PulsarSourceUnorderedE2ECase>UnorderedSourceTestSuiteBase.testOneSplitWithMultipleConsumers
 :60 » BrokerPersistence
2022-01-20T15:39:52.3934698Z Jan 20 15:39:52 [ERROR]   
PulsarSourceUnorderedE2ECase>UnorderedSourceTestSuiteBase.testOneSplitWithMultipleConsumers
 :60 » BrokerPersistence
{code}

{code}
2022-01-20T15:28:37.1467261Z Jan 20 15:28:37 [ERROR] 
org.apache.flink.tests.util.pulsar.PulsarSourceUnorderedE2ECase.testOneSplitWithMultipleConsumers(TestEnvironment,
 ExternalContext)[2]  Time elapsed: 77.698 s  <<< ERROR!
2022-01-20T15:28:37.1469146Z Jan 20 15:28:37 
org.apache.pulsar.client.api.PulsarClientException$BrokerPersistenceException: 
org.apache.bookkeeper.mledger.ManagedLedgerException: Not enough non-faulty 
bookies available
2022-01-20T15:28:37.1470062Z Jan 20 15:28:37at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:985)
2022-01-20T15:28:37.1470802Z Jan 20 15:28:37at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95)
2022-01-20T15:28:37.1471598Z Jan 20 15:28:37at 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.sendMessages(PulsarRuntimeOperator.java:172)
2022-01-20T15:28:37.1472451Z Jan 20 15:28:37at 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.sendMessages(PulsarRuntimeOperator.java:167)
2022-01-20T15:28:37.1473307Z Jan 20 15:28:37at 
org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter.writeRecords(PulsarPartitionDataWriter.java:41)
2022-01-20T15:28:37.1474209Z Jan 20 15:28:37at 
org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase.testOneSplitWithMultipleConsumers(UnorderedSourceTestSuiteBase.java:60)
2022-01-20T15:28:37.1474949Z Jan 20 15:28:37at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-01-20T15:28:37.1475658Z Jan 20 15:28:37at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-01-20T15:28:37.1476383Z Jan 20 15:28:37at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-01-20T15:28:37.1477030Z Jan 20 15:28:37at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-01-20T15:28:37.1477670Z Jan 20 15:28:37at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
2022-01-20T15:28:37.1478388Z Jan 20 15:28:37at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25739) Include dstl-dfs into distribution (opt/)

2022-01-20 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25739:
-

 Summary: Include dstl-dfs into distribution (opt/)
 Key: FLINK-25739
 URL: https://issues.apache.org/jira/browse/FLINK-25739
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25710) Multiple Kafka IT cases fail with "ContainerLaunch Container startup failed"

2022-01-19 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25710:
-

 Summary: Multiple Kafka IT cases fail with "ContainerLaunch 
Container startup failed"
 Key: FLINK-25710
 URL: https://issues.apache.org/jira/browse/FLINK-25710
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Tests
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29731=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=35454
{code}
2022-01-19T18:17:40.3503774Z Jan 19 18:17:40 [INFO] 
---
2022-01-19T18:17:42.3992027Z Jan 19 18:17:42 [ERROR] Picked up 
JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError
2022-01-19T18:17:42.9262342Z Jan 19 18:17:42 [INFO] Running 
org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase
2022-01-19T18:18:47.9992530Z Jan 19 18:18:47 [ERROR] Tests run: 1, Failures: 0, 
Errors: 1, Skipped: 0, Time elapsed: 65.053 s <<< FAILURE! - in or 
g.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase
2022-01-19T18:18:47.9993836Z Jan 19 18:18:47 [ERROR] 
org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase  Time elapsed: 
65.053 s  <<<  ERROR!
2022-01-19T18:18:47.9994507Z Jan 19 18:18:47 
org.testcontainers.containers.ContainerLaunchException: Container startup failed
...
2022-01-19T18:18:48.0038449Z Jan 19 18:18:47 Caused by: 
org.rnorth.ducttape.RetryCountExceededException: Retry limit hit with exception
2022-01-19T18:18:48.0039451Z Jan 19 18:18:47at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:88)
2022-01-19T18:18:48.0040449Z Jan 19 18:18:47at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:329)
2022-01-19T18:18:48.0041204Z Jan 19 18:18:47... 27 more
2022-01-19T18:18:48.0041993Z Jan 19 18:18:47 Caused by: 
org.testcontainers.containers.ContainerLaunchException: Could not create/start 
container
2022-01-19T18:18:48.0043007Z Jan 19 18:18:47at 
org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525)
2022-01-19T18:18:48.0044020Z Jan 19 18:18:47at 
org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331)
2022-01-19T18:18:48.0045158Z Jan 19 18:18:47at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
2022-01-19T18:18:48.0046043Z Jan 19 18:18:47... 28 more
2022-01-19T18:18:48.0047026Z Jan 19 18:18:47 Caused by: 
org.testcontainers.containers.ContainerLaunchException: Timed out waiting for 
container po*rt to open (172.17.0.1 ports: [56218, 56219] should be listening)
2022-01-19T18:18:48.0048320Z Jan 19 18:18:47at 
org.testcontainers.containers.wait.strategy.HostPortWaitStrategy.waitUntilReady(HostPortWaitStr
 ategy.java:90)
2022-01-19T18:18:48.0049465Z Jan 19 18:18:47at 
org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStr
 ategy.java:51)
2022-01-19T18:18:48.0050585Z Jan 19 18:18:47at 
org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:
 929)
2022-01-19T18:18:48.0051628Z Jan 19 18:18:47at 
org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:468)
2022-01-19T18:18:48.0052380Z Jan 19 18:18:47... 30 more

...

2022-01-19T18:40:37.7197924Z Jan 19 18:40:37 [INFO] Results:
2022-01-19T18:40:37.7198526Z Jan 19 18:40:37 [INFO]
2022-01-19T18:40:37.7199093Z Jan 19 18:40:37 [ERROR] Errors:
2022-01-19T18:40:37.7200602Z Jan 19 18:40:37 [ERROR]   KafkaSinkITCase » 
ContainerLaunch Container startup failed
2022-01-19T18:40:37.7201683Z Jan 19 18:40:37 [ERROR]   
KafkaTransactionLogITCase » ContainerLaunch Container startup failed
2022-01-19T18:40:37.7204632Z Jan 19 18:40:37 [ERROR]   
KafkaWriterITCase.beforeAll:99 » ContainerLaunch Container startup failed

{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25678) TaskExecutorStateChangelogStoragesManager.shutdown is not thread-safe

2022-01-17 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25678:
-

 Summary: TaskExecutorStateChangelogStoragesManager.shutdown is not 
thread-safe
 Key: FLINK-25678
 URL: https://issues.apache.org/jira/browse/FLINK-25678
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.2, 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0, 1.14.4


[https://github.com/apache/flink/pull/18169#discussion_r785741977]

The method is called from the shutdown hook and therefore should be thread-safe.

cc: [~Zakelly] , [~dmvk] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25598) Changelog materialized state discarded on failure

2022-01-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25598:
-

 Summary: Changelog materialized state discarded on failure
 Key: FLINK-25598
 URL: https://issues.apache.org/jira/browse/FLINK-25598
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


Similar to FLINK-25395:

error handling in {{PeriodicMaterializationManager.uploadSnapshot}}discards 
uploaded state;

however, wrapped backend might assume state was uploaded

cc: [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25395) Incremental shared state might be discarded by TM

2021-12-20 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25395:
-

 Summary: Incremental shared state might be discarded by TM
 Key: FLINK-25395
 URL: https://issues.apache.org/jira/browse/FLINK-25395
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


Extracting from [FLINK-25185 
discussion|https://issues.apache.org/jira/browse/FLINK-25185?focusedCommentId=17462639=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17462639]

On checkpoint abortion or any failure in AsyncCheckpointRunnable,
it discards the state, in particular shared (incremental) state.

Since FLINK-24611, this creates a problem because shared state can be re-used 
for future checkpoints. 

Needs confirmation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25261) Changelog not truncated on materialization

2021-12-11 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25261:
-

 Summary: Changelog not truncated on materialization
 Key: FLINK-25261
 URL: https://issues.apache.org/jira/browse/FLINK-25261
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


[https://github.com/apache/flink/blob/dcc4d43e413b20f70036e73c61d52e2e1c5afee7/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java#L640]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25260) Recovery fails when using changelog+s3+presto

2021-12-11 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25260:
-

 Summary: Recovery fails when using changelog+s3+presto
 Key: FLINK-25260
 URL: https://issues.apache.org/jira/browse/FLINK-25260
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Runtime / State Backends
Reporter: Roman Khachatryan


Recovery succeeds if using local FS or hadoop S3 plugin, but fails with Presto:
{code}
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
at 
org.apache.flink.changelog.fs.StateChangeFormat$1.readChange(StateChangeFormat.java:138)
at 
org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:129)
at 
org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:98)
at 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:76)
at 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:61)
at 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:94)
at 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:74)
at 
org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221)
at 
org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
{code}


This is likely caused by some intermediate buffers



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25144) Manual test

2021-12-02 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25144:
-

 Summary: Manual test
 Key: FLINK-25144
 URL: https://issues.apache.org/jira/browse/FLINK-25144
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.15.0


Test plan: 
[https://docs.google.com/document/d/10WVFA0BSR0zrRKRjQbB3iEiC7167MBeAwD6swn_eT24/edit?usp=sharing]

 

The ticket should be split into multiple ones when needed (correctness, 
performance, usability - see the plan above).

 

cc: [~yunta] , [~ym]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25143) Add ITCase for periodic materialization

2021-12-02 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25143:
-

 Summary: Add ITCase for periodic materialization
 Key: FLINK-25143
 URL: https://issues.apache.org/jira/browse/FLINK-25143
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Tests
Reporter: Roman Khachatryan
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25024) Add ChangelogBackend documentation

2021-11-23 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25024:
-

 Summary: Add ChangelogBackend documentation
 Key: FLINK-25024
 URL: https://issues.apache.org/jira/browse/FLINK-25024
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Roman Khachatryan
 Fix For: 1.15.0


Currently, changelog backend is hidden from users documentation-wise.

Once the feature is ready, the following needs to be documented:
 * General description (page 
[https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/]
 )
 * Configuration (page 
[https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/] 
- StateChangelogOptions, FsStateChangelogOptions)
 * Uploader metrics (page 
[https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/] , see 
FLINK-23486)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24938) Checkpoint cleaner is closed before checkpoints are discarded

2021-11-17 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-24938:
-

 Summary: Checkpoint cleaner is closed before checkpoints are 
discarded
 Key: FLINK-24938
 URL: https://issues.apache.org/jira/browse/FLINK-24938
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0, 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0, 1.14.1


When CompletedCheckpointStore shuts down it tries to discard some checkpoints 
using CheckpointCleaner. The latter is closed asynchronously since FLINK-23647 
and before the Store.

Visible as warning when running 
ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsZookeeper:

{code}
2021-11-17 10:47:10,599 Fail to remove checkpoint during shutdown. 
[DefaultCompletedCheckpointStore flink-akka.actor.default-dispatcher-5]
 java.lang.IllegalStateException: CheckpointsCleaner has already been closed
 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[classes/:?]
 at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.incrementNumberOfCheckpointsToClean(CheckpointsCleaner.java:105)
 ~[classes/:?]
 at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.cleanup(CheckpointsCleaner.java:87)
 ~[classes/:?]
 at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.cleanCheckpoint(CheckpointsCleaner.java:62)
 ~[classes/:?]
 at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.tryRemoveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:2
 at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.shutdown(DefaultCompletedCheckpointStore.java:172)
 ~[classes/:?]
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.shutDownCheckpointServices(SchedulerBase.java:222)
 ~[classes/:?]

{code}

But the test still passes.


cc: [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24903) AdaptiveSchedulerTest.testJobStatusListenerNotifiedOfJobStatusChanges unstable

2021-11-15 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-24903:
-

 Summary: 
AdaptiveSchedulerTest.testJobStatusListenerNotifiedOfJobStatusChanges unstable
 Key: FLINK-24903
 URL: https://issues.apache.org/jira/browse/FLINK-24903
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


[https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1225=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=511d2595-ec54-5ab7-86ce-92f328796f20=7753]

 

Locally, it fails ~14 runs out of 100 (when running only 
testJobStatusListenerNotifiedOfJobStatusChanges in a loop).

 

It looks like job termination future is always completed before the 
jobStatusChangeListener is notified (AdaptiveScheduler.transitionToState, 
targetState.getState() completes the future).

Sleeping for 1ms before checking the assertion prevents the failure.

 

cc: [~trohrmann] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24864) Release TaskManagerJobMetricGroup with the last slot rather than task

2021-11-10 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-24864:
-

 Summary: Release TaskManagerJobMetricGroup with the last slot 
rather than task
 Key: FLINK-24864
 URL: https://issues.apache.org/jira/browse/FLINK-24864
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


[https://docs.google.com/document/d/1k5WkWIYzs3n3GYQC76H9BLGxvN3wuq7qUHJuBPR9YX0/edit?usp=sharing]
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24826) Performance regression in HeapState benchmarks on Nov 8 2021

2021-11-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-24826:
-

 Summary: Performance regression in HeapState benchmarks on Nov 8 
2021
 Key: FLINK-24826
 URL: https://issues.apache.org/jira/browse/FLINK-24826
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=listAdd.HEAP=2=200=off=on=on
http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=listUpdate.HEAP=2=200=off=on=on
http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=mapRemove.HEAP=2=200=off=on=on


All affected benchmarks:
listAdd.HEAP
listAppend.HEAP
listGet.HEAP
listGetAndIterate.HEAP
listUpdate.HEAP
mapAdd.HEAP
mapIsEmpty.HEAP
mapKeys.HEAP
mapRemove.HEAP
mapUpdate.HEAP
mapValues.HEAP
valueAdd.HEAP

good commit: b3b50559cf22f188ddb9cad62ecfb83881c47961
bad commit: fc4f255644a64bb556b0dcefb165a9c772164c5b

It's very likely fc4f255644a64bb556b0dcefb165a9c772164c5b is the cause of the 
regression (in between there are only docs updates and this one is 
heap-related).

cc: [~Zakelly]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24611) Prevent JM from discarding state on checkpoint abortion

2021-10-21 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-24611:
-

 Summary: Prevent JM from discarding state on checkpoint abortion
 Key: FLINK-24611
 URL: https://issues.apache.org/jira/browse/FLINK-24611
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


When a checkpoint is aborted, JM discards any state that was sent to it and 
wasn't used in other checkpoints. This forces incremental state backends to 
wait for confirmation from JM before re-using this state. For changelog backend 
this is even more critical.


 One approach proposed was to make backends/TMs responsible for their state, 
until it's not shared with other TMs, i.e. until rescaling (private/shared 
state ownership track).
 However, that approach is quite invasive.

An alternative solution would be:
 1. SharedStateRegistry remembers the latest checkpoint for each shared state 
(instead of usage count currently)
 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on 
subsumption)
 3. SharedStateRegistry then discards any state associated with the lower 
(subsumed/aborted) checkpoints
 So the aborted checkpoint can only be discarded after some subsequent 
successful checkpoint (which can mark state as used).

Only JM code is changed.

 

Implementation considerations.

On subsumption, JM needs to find all the unused state and discard it.
 This can either be done by
 1) simply traversing all entries; or by 
 2) maintaining a set of entries per checkpoint (e.g. SortedMap>). This allows to skip unnecessary traversal at the cost of higher 
memory usage

In both cases:
 - each entry stores last checkpoint ID it was used in (long)
 - key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
computes hash internally)

Given the following constraints:
 - 10M state entries at most
 - 10 (retained) checkpoint at most
 - 10 checkpoints per second at most
 - state entry key takes 32B (usually UUID or two UUIDs)

The extra space for (2) would be in order of 10M*32B=38Mb.
 The extra time for (1) would be in order of 10M * 10 checkpoints per second * 
ratio of outdated entries per checkpoint. Depending on the ratio and the 
hardware, this could take up to hundreds of ms per second, blocking the main 
thread.
 So approach (2) seems reasonable.

The following cases shouldn't pose any difficulties:
 1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
register all states on recovery even after FLINK-22483/FLINK-24086
 2. PlaceholderStreamStateHandles
 3. Cross-task state sharing - not an issue as long as everything is managed by 
JM
 4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
simple after FLINK-24086

The following should be kept in mind:
 1. On job cancellation, state of aborted checkpoints should be cleaned up 
explicitly
 2. Savepoints should be ignored and not change 
CheckpointStore.lowestCheckpointID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24402) Add back-pressure metrics for the ChangelogStateBackend

2021-09-29 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-24402:
-

 Summary: Add back-pressure metrics for the ChangelogStateBackend
 Key: FLINK-24402
 URL: https://issues.apache.org/jira/browse/FLINK-24402
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks
Reporter: Roman Khachatryan
 Fix For: 1.15.0


E.g. in-flight requests, request size, latency, number of “Logs” per request, 
errors.

 

With back-pressure (FLINK-23381) it's very important because the task will be 
shown as busy in the UI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23971) PulsarSourceITCase.testIdleReader failed on azure

2021-08-25 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23971:
-

 Summary: PulsarSourceITCase.testIdleReader failed on azure
 Key: FLINK-23971
 URL: https://issues.apache.org/jira/browse/FLINK-23971
 Project: Flink
  Issue Type: Bug
Reporter: Roman Khachatryan


{code}
[ERROR] Failures:
[ERROR]   PulsarSourceITCase>SourceTestSuiteBase.testIdleReader:193
Expected: Records consumed by Flink should be identical to test data and 
preserve the order in multip le splits
 but: Unexpected record 'tj7MpFRWX95GzBpSF3CCjxKSal6bRhR0aF'
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22819=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24448

This is the same error as in FLINK-23828



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23950) Revert FLINK-23738 (i.e. unhide config, API, docs)

2021-08-24 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23950:
-

 Summary: Revert FLINK-23738 (i.e. unhide config, API, docs)
 Key: FLINK-23950
 URL: https://issues.apache.org/jira/browse/FLINK-23950
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Runtime / State Backends
Reporter: Roman Khachatryan
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23876) Remove limitation about JDBC exactly once sink about multiple connections per transaction

2021-08-19 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23876:
-

 Summary: Remove limitation about JDBC exactly once sink about 
multiple connections per transaction 
 Key: FLINK-23876
 URL: https://issues.apache.org/jira/browse/FLINK-23876
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC, Documentation
Affects Versions: 1.14.0
Reporter: Roman Khachatryan
 Fix For: 1.14.0


[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink]
says:{quote}Attention: In 1.13, Flink JDBC sink does not support exactly-once 
mode with MySQL or other databases that do not support multiple XA transaction 
per connection. We will improve the support in FLINK-22239.{quote}

In FLINK- connection pooling was added so this limitation doesn't apply anymore.
So this should be removed or replaced with a list of databases.

Connection pooling should be enabled explicitly: should list DBs requiring this 
(mysql, postgres) and give an example code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23862) Race condition while cancelling task during initialization

2021-08-18 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23862:
-

 Summary: Race condition while cancelling task during initialization
 Key: FLINK-23862
 URL: https://issues.apache.org/jira/browse/FLINK-23862
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.14.0
Reporter: Roman Khachatryan
 Fix For: 1.14.0


While debugging the recent failures in FLINK-22889, I see that sometimes the 
operator chain is not closed if the task is cancelled while it's being 
initialized.

 

The reason is that on restore(), cleanUpInvoke() is only closed if there was an 
exception, including CancelTaskException.

The latter is only thrown if StreamTask.canceled is set, i.e. TaskCanceler has 
called StreamTask.cancel().

 

So if StreamTask is cancelled in between restore and normal invoke then it may 
not close the operator chain and not do other cleanup.

 

One solution is to make StreamTask.cleanup visible to and called from Task.

 

cc: [~akalashnikov], [~pnowojski]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23811) Handle FINISHED subtasks in CommonTestUtils.waitForAllTaskRunning

2021-08-16 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23811:
-

 Summary: Handle FINISHED subtasks in 
CommonTestUtils.waitForAllTaskRunning
 Key: FLINK-23811
 URL: https://issues.apache.org/jira/browse/FLINK-23811
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.14.0
Reporter: Roman Khachatryan


 CommonTestUtils.waitForAllTaskRunning returns when all the subtasks are 
running AND the job is running and not finished. However, with FLIP-147, 
subtasks may finish and the job will still be running. So the method won't 
return and instead timeout.

 

The solution could be:
- For new tests that can have finished subtasks treat return if subtask is 
RUNING || FINISHED
- For old tests (that assume no finished subtasks) throw an exception


Note that a subtask may be in some other state (e.g. CANCELLED) which is fine, 
as it can change after failing over the job.

 

This change is extracted from FLINK-21090 into a separate ticket because 
multiple IT cases might be affected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23770) Unable to recover after source fully finished

2021-08-13 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23770:
-

 Summary: Unable to recover after source fully finished
 Key: FLINK-23770
 URL: https://issues.apache.org/jira/browse/FLINK-23770
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Roman Khachatryan
 Fix For: 1.14.0


When running one of the IT cases from 
https://github.com/apache/flink/pull/16773 
I see the following failure:
 {code}
10194 [flink-akka.actor.default-dispatcher-7] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a 
global failure.
org.apache.flink.util.FlinkRuntimeException: Can not restore vertex Source: 
Custom Source -> Timestamps/Watermarks(cbc357ccb763df2852fee8c4fc7d55f2) which 
contain both finished and unfinished operators
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.calculateIfFinished(CheckpointCoordinator.java:1651)
 ~[classes/:?]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.lambda$getOrUpdate$0(CheckpointCoordinator.java:1631)
 ~[classes/:?]
at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[?:1.8.0_271]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.getOrUpdate(CheckpointCoordinator.java:1629)
 ~[classes/:?]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.validateFinishedOperators(CheckpointCoordinator.java:1674)
 ~[classes/:?]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1577)
 ~[classes/:?]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks(CheckpointCoordinator.java:1438)
 ~[classes/:?]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:398)
 ~[classes/:?]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasks(DefaultScheduler.java:317)
 ~[classes/:?]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$null$2(DefaultScheduler.java:287)
 ~[classes/:?]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
~[?:1.8.0_271]
at 
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
 ~[?:1.8.0_271]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_271]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
 ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
at akka.actor.Actor.aroundReceive(Actor.scala:537) 

  1   2   3   >