[jira] [Created] (FLINK-35933) Skip distributing maxAllowedWatermark if there are no subtasks
Roman Khachatryan created FLINK-35933: - Summary: Skip distributing maxAllowedWatermark if there are no subtasks Key: FLINK-35933 URL: https://issues.apache.org/jira/browse/FLINK-35933 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0 On JM, `SourceCoordinator.announceCombinedWatermark` executes unnecessary if there are no subtasks to distribute maxAllowedWatermark. This involves Heap and ConcurrentHashMap accesses and lots of logging. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35787) DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown)
Roman Khachatryan created FLINK-35787: - Summary: DefaultSlotStatusSyncer might bring down JVM (exit code 239 instead of a proper shutdown) Key: FLINK-35787 URL: https://issues.apache.org/jira/browse/FLINK-35787 Project: Flink Issue Type: Bug Reporter: Roman Khachatryan In our internal CI, I've encountered the following error: {code:java} * 12:02:47,205 [ pool-126-thread-1] ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'pool-126-thread-1' produced an uncaught exception. Stopping the process... java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = java.util.concurrent.> at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?] at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:951) ~[?:?] at java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2282) ~[?:?] at org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) ~[classes/:?] at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) ~[classes/:?] at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645) ~[classes/:?] at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$checkResourceRequirementsWithDelay$12(FineGrainedSlotManager.java:603) ~[classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38ce013a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@640a9cf7[Wrapped task = java.util.concurrent.CompletableFuture$UniHandle@f3d> at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705) ~[?:?] at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687) ~[?:?] at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:949) ~[?:?] ... 11 more{code} >From the code, it looks like RM main thread executor was shut down, and that >triggered JVM exit: {code:java} CompletableFuture requestFuture = gateway.requestSlot( SlotID.getDynamicSlotID(resourceId), jobId, allocationId, resourceProfile, targetAddress, resourceManagerId, taskManagerRequestTimeout); CompletableFuture returnedFuture = new CompletableFuture<>(); FutureUtils.assertNoException( requestFuture.handleAsync( (Acknowledge acknowledge, Throwable throwable) -> { ... }, mainThreadExecutor)); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35786) NPE in BlobServer / shutdownHook
Roman Khachatryan created FLINK-35786: - Summary: NPE in BlobServer / shutdownHook Key: FLINK-35786 URL: https://issues.apache.org/jira/browse/FLINK-35786 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.19.1 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0, 1.19.2 In constructor, BlobServer registers a shutdown hook to close the socket. Later in constructor, BlobServer creates this socket (and makes sure it's not null). But if the shutdown hook gets invoked before opening the socket, NPE will be thrown: {code:java} 12:02:49,983 [PermanentBlobCache shutdown hook] INFO org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache 12:02:49,985 [BlobServer shutdown hook] ERROR org.apache.flink.runtime.blob.BlobServer [] - Error during shutdown of BlobServer via JVM shutdown hook. java.lang.NullPointerException: null at org.apache.flink.runtime.blob.BlobServer.close(BlobServer.java:358) ~[classes/:?] at org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39) ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at java.lang.Thread.run(Thread.java:829) [?:?] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35769) State files might not be deleted on task cancellation
Roman Khachatryan created FLINK-35769: - Summary: State files might not be deleted on task cancellation Key: FLINK-35769 URL: https://issues.apache.org/jira/browse/FLINK-35769 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.19.1 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0 We have a job in an infinite (fast) restart loop, that’s crashing with a serialization issue. The issue here is that each restart seems to leak state files (not cleaning up ones from the previous run): {{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l 7990}} {{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l 689}} Eventually TM will use too much disk space. The problem is in [https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75] {code:java} try { List> futures = transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser) .collect(Collectors.toList()); // Wait until either all futures completed successfully or one failed exceptionally. FutureUtils.completeAll(futures).get(); } catch (Exception e) { downloadRequests.stream() .map(StateHandleDownloadSpec::getDownloadDestination) .map(Path::toFile) .forEach(FileUtils::deleteDirectoryQuietly); {code} Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them. But if {{completeAll}} is interrupted, then download runnable might re-create it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35742) Don't create RocksDB CF if task cancellation is in progress
Roman Khachatryan created FLINK-35742: - Summary: Don't create RocksDB CF if task cancellation is in progress Key: FLINK-35742 URL: https://issues.apache.org/jira/browse/FLINK-35742 Project: Flink Issue Type: Improvement Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35557) MemoryManager only reserves memory per consumer type once
Roman Khachatryan created FLINK-35557: - Summary: MemoryManager only reserves memory per consumer type once Key: FLINK-35557 URL: https://issues.apache.org/jira/browse/FLINK-35557 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Runtime / Task Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3, 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0, 1.19.1 # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526] a reserve function # The function [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61] the available Slot memory and fails if there's not enough memory # We pass it to {{SharedResources.getOrAllocateSharedResource}} # In {{SharedResources.getOrAllocateSharedResource}} , we check if the resource (memory) was already reserved by some key (e.g. {{{}state-rocks-managed-memory{}}}) # If not, we create a new one and call the reserve function # If the resource was already reserved (not null), we do NOT reserve the memory again: [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71] So there will be only one (first) memory reservation for rocksdb for example, no matter how many state backends are created. Meaning that managed memory limits are not followed (edited) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
Roman Khachatryan created FLINK-35556: - Summary: Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED Key: FLINK-35556 URL: https://issues.apache.org/jira/browse/FLINK-35556 Project: Flink Issue Type: Bug Affects Versions: 1.19.0, 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0, 1.19.1 See https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35501) Use common thread pools when transferring RocksDB state files
Roman Khachatryan created FLINK-35501: - Summary: Use common thread pools when transferring RocksDB state files Key: FLINK-35501 URL: https://issues.apache.org/jira/browse/FLINK-35501 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0 Currently, each RocksDB state backend creates an executor backed by a thread pool. This makes it difficult to control the total number of threads per TM because it might have at least one task per slot and theoretically, many state backends per task (because of chaining). Additionally, using a common thread pool allows to indirectly control the load on the underlying DFS (e.g. the total number of requests to S3 from a TM). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34994) JobIDLoggingITCase fails because of "checkpoint confirmation for unknown task"
Roman Khachatryan created FLINK-34994: - Summary: JobIDLoggingITCase fails because of "checkpoint confirmation for unknown task" Key: FLINK-34994 URL: https://issues.apache.org/jira/browse/FLINK-34994 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58640=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8735] [https://github.com/apache/flink/actions/runs/8502821551/job/23287730632#step:10:8131] [https://github.com/apache/flink/actions/runs/8507870399/job/23300810619#step:10:8086] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34559) TVF Window Aggregations might stuck
Roman Khachatryan created FLINK-34559: - Summary: TVF Window Aggregations might stuck Key: FLINK-34559 URL: https://issues.apache.org/jira/browse/FLINK-34559 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.18.1, 1.19.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan RecordsWindowBuffer flushes buffered records in the following cases: * watermark * checkpoint barrier * buffer overflow In two-phase aggregations, this creates the following problems: 1) Local aggregation: enters hard-backpressure because for flush, it outputs the data downstream and doesn't check network buffer availability This already disrupts normal checkpointing and watermarks progression 2) Global aggregation: When the window is large enough and/or the watermark is lagging, lots of data is flushed to state backend (and the state is updated) in checkpoint SYNC phase. All this eventually causes checkpoint timeouts (10 minutes in our env). Example query {code:java} INSERT INTO `target_table` SELECT window_start, window_end, some, attributes, SUM(view_time) AS total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' HOUR)) GROUP BY window_start, window_end, some, attributes;{code} As a quick fix, we might want to: # limit the amount of data buffered in Global Aggregation nodes # disable two-phase aggregations, i.e. Local Aggregations (we can try to limit buffing there two, but network buffer availability can not be easily checked from the operator) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34420) Various YARN tests fail after failing to download hadoop.tar.gz
Roman Khachatryan created FLINK-34420: - Summary: Various YARN tests fail after failing to download hadoop.tar.gz Key: FLINK-34420 URL: https://issues.apache.org/jira/browse/FLINK-34420 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.19.0, 1.18.2, 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.19.0, 1.18.2, 1.20.0 https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1702=logs=0e31ee24-31a6-528c-a4bf-45cde9b2a14e=696bc156-f753-5888-468e-42d78df39222=11334 https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1700=logs=bbbd0720-137e-5f59-95a5-b5d332f196d3=4769aa47-e87b-5ecd-1fb2-14d52396866d=9937 ``` 2024-02-09T19:21:57.8947690Z Feb 09 19:21:57 Pre-downloading Hadoop tarball 2024-02-09T19:21:57.9250518Z % Total% Received % Xferd Average Speed TimeTime Time Current 2024-02-09T19:21:57.9250844Z Dload Upload Total SpentLeft Speed 2024-02-09T19:21:57.9251910Z 2024-02-09T19:21:58.0005684Z 0 00 00 0 0 0 --:--:-- --:--:-- --:--:-- 0 2024-02-09T19:21:58.0006583Z 100 288 100 2880 0 3789 0 --:--:-- --:--:-- --:--:-- 3789 ``` which is way too small - meaning we got redirection (https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1700=logs=bbbd0720-137e-5f59-95a5-b5d332f196d3=4769aa47-e87b-5ecd-1fb2-14d52396866d=8056 ) Later, it can't be unpacked: https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1700=logs=bbbd0720-137e-5f59-95a5-b5d332f196d3=4769aa47-e87b-5ecd-1fb2-14d52396866d=9657 ``` #11 [ 7/28] COPY hadoop.tar.gz /tmp/hadoop.tar.gz #11 CACHED #12 [ 8/28] RUN set -x && mkdir -p /usr/local/hadoop && tar -xf /tmp/hadoop.tar.gz --strip-components=1 -C /usr/local/hadoop && rm /tmp/hadoop.tar.gz* #12 0.175 + mkdir -p /usr/local/hadoop #12 0.177 + tar -xf /tmp/hadoop.tar.gz --strip-components=1 -C /usr/local/hadoop #12 0.178 tar: This does not look like a tar archive #12 0.179 #12 0.179 gzip: stdin: not in gzip format #12 0.179 tar: Child returned status 1 #12 0.179 tar: Error is not recoverable: exiting now ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34417) Add JobID to logging MDC
Roman Khachatryan created FLINK-34417: - Summary: Add JobID to logging MDC Key: FLINK-34417 URL: https://issues.apache.org/jira/browse/FLINK-34417 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Coordination, Runtime / Task Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.19.0 Adding JobID to logging MDC allows to apply Structural Logging and analyze Flink logs more efficiently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34344) Wrong JobID in CheckpointStatsTracker
Roman Khachatryan created FLINK-34344: - Summary: Wrong JobID in CheckpointStatsTracker Key: FLINK-34344 URL: https://issues.apache.org/jira/browse/FLINK-34344 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.19.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.19.0 The job id is generated randomly: ``` public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) { this(numRememberedCheckpoints, metricGroup, new JobID(), Integer.MAX_VALUE); } ``` This affects how it is logged (or reported elsewhere). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33590) CheckpointStatsTracker.totalNumberOfSubTasks not updated
Roman Khachatryan created FLINK-33590: - Summary: CheckpointStatsTracker.totalNumberOfSubTasks not updated Key: FLINK-33590 URL: https://issues.apache.org/jira/browse/FLINK-33590 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Coordination Affects Versions: 1.18.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.18.1 On rescaling, the DoP is obtained from the JobGraph. However, JobGraph vertices are not updated once created. This results in missing traces on rescaling (isComplete returns false). Instead, it should be obtained from DoP store. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33442) UnsupportedOperationException thrown from RocksDBIncrementalRestoreOperation
Roman Khachatryan created FLINK-33442: - Summary: UnsupportedOperationException thrown from RocksDBIncrementalRestoreOperation Key: FLINK-33442 URL: https://issues.apache.org/jira/browse/FLINK-33442 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.17.1 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.17.2 When using the new rescaling API, it's possible to get {code:java} 2023-10-31 18:25:05,179 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.lang.UnsupportedOperationException: null at java.util.Collections$1.remove(Collections.java:4714) ~[?:?] at java.util.AbstractCollection.remove(AbstractCollection.java:299) ~[?:?] at org.apache.flink.runtime.checkpoint.StateObjectCollection.remove(StateObjectCollection.java:105) ~[flink-runtime-1.17.1-143.jar:1.17.1-143] at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:294) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:167) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:327) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:338) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:355) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:166) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:735) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:710) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:676) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) [flink-runtime-1.17.1-143.jar:1.17.1-143] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-runtime-1.17.1-143.jar:1.17.1-143] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-runtime-1.17.1-143.jar:1.17.1-143] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-runtime-1.17.1-143.jar:1.17.1-143] at java.lang.Thread.run(Thread.java:829) [?:?] 2023-10-31 18:25:05,182 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_353a6b34b8b7f1c1d0fb4616d911049c_(1/2) from alternative (1/2), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:407) ~[flink-dist-1.17.1-143.jar:1.17.1-143] at
[jira] [Created] (FLINK-31601) While waiting for resources, resources check might be scheduled unlimited number of times (Adaptive Scheduler)
Roman Khachatryan created FLINK-31601: - Summary: While waiting for resources, resources check might be scheduled unlimited number of times (Adaptive Scheduler) Key: FLINK-31601 URL: https://issues.apache.org/jira/browse/FLINK-31601 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.17.0 Reporter: Roman Khachatryan Fix For: 1.17.1 See [https://github.com/apache/flink/pull/22169#discussion_r1136395017] {quote}when {{resourceStabilizationDeadline}} is not null, should we skip scheduling {{checkDesiredOrSufficientResourcesAvailable}} (on [line 166|https://github.com/apache/flink/blob/a64781b1ef8f129021bdcddd3b07548e6caa4a72/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java#L166])? Otherwise, we schedule as many checks as there are changes in resources. {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31261) Make AdaptiveScheduler aware of the (local) state size
Roman Khachatryan created FLINK-31261: - Summary: Make AdaptiveScheduler aware of the (local) state size Key: FLINK-31261 URL: https://issues.apache.org/jira/browse/FLINK-31261 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.18.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.18.0 FLINK-21450 makes the Adaptive Schulder aware of Local Recovery. Each slot-group pair is assigned a score based on a keyGroupRange size. That score isn't always optimlal - it could be improved by computing the score based on the actual state size on disk. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30073) Managed memory can be wasted if rocksdb memory is fixed-per-slot
Roman Khachatryan created FLINK-30073: - Summary: Managed memory can be wasted if rocksdb memory is fixed-per-slot Key: FLINK-30073 URL: https://issues.apache.org/jira/browse/FLINK-30073 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.15.2, 1.16.0, 1.17.0 Reporter: Roman Khachatryan When [state.backend.rocksdb.memory.fixed-per-slot|https://github.com/apache/flink/blob/ba4b182955867fedfa9891bf0bf430e92eeab41a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java#L75] is set, RocksDB does not use managed memory. However, the runtime [doesn't take this into account|https://github.com/apache/flink/blob/ba4b182955867fedfa9891bf0bf430e92eeab41a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java#L75] and still reserves the managed memory according to the configured weigths. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29985) SlotTable not close on TM termination
Roman Khachatryan created FLINK-29985: - Summary: SlotTable not close on TM termination Key: FLINK-29985 URL: https://issues.apache.org/jira/browse/FLINK-29985 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.16.0, 1.15.3 Reporter: Roman Khachatryan When a slot is released, the associated resources are released as well, in particular, MemoryManager. MemoryManager might hold not only memory, but also some arbitrary shared resources (currently, PythonSharedResources and RocksDBSharedResources). When TM is stopped by JManager, its slot table is closed, causing all its slot to be released When TM is stopped by SIGTERM (i.e. external resource manager), its slot table is NOT closed. That means that in standalone clusters, some resources might not be released. As of now, RocksDBSharedResources contains only ephemeral resources. Not sure about PythonSharedResources, but likely it is associated with a separate process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29928) Allow sharing (RocksDB) memory between slots
Roman Khachatryan created FLINK-29928: - Summary: Allow sharing (RocksDB) memory between slots Key: FLINK-29928 URL: https://issues.apache.org/jira/browse/FLINK-29928 Project: Flink Issue Type: New Feature Components: Runtime / Configuration, Runtime / State Backends, Runtime / Task Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.17.0 h1. Background and motivation RocksDB is one of the main consumers of off-heap memory, which it uses for BlockCache, MemTables, Indices and Bloom Filters. Since 1.10 (FLINK-7289), it is possible to: - share these objects among RocksDB instances of the same slot - bound the total memory usage by all RocksDB instances of a TM The memory is divided between the slots equally (unless using fine-grained resource control). This is sub-optimal, if some slots contain more memory intensive tasks than the others. The proposal is to widen the scope of sharing memory to TM so that it can be shared across all its RocksDB instances. That would allow to reduce the overall memory consuption in exchange for resource isolation. h1. Proposed changes h2. Configuration - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0) -- cluster-level (yaml only) -- the non-shared memory will be used as it is now (exclusively per-slot) - introduce "state.backend.memory.share-scope" -- job-level (yaml and StateBackend) -- possible values: NONE, SLOT, TASK_MANAGER -- default: not set -- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but don't deprecate it, because it specifies the size) - rely on the existing "state.backend.rocksdb.memory.managed" to decide whether the shared memory is managed or unmanaged - when calculating TM-wise shared memory, ignore "taskmanager.memory.managed.consumer-weights" because RocksDB is the only consumer so far - similarly, exclude StateBackend from weights calculations, so other consumers (e.g. PYTHON) can better utilize exclusive slot memory - use cluster-level or default configuration when creating TM-wise shared RocksDB objects, e.g. "state.backend.rocksdb.memory.managed", "state.backend.rocksdb.memory.write-buffer-ratio" h2. Example {code} taskmanager.memory.managed.size: 1gb taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of shared managed memory taskmanager.numberOfTaskSlots: 10 # each task slot gets 25Mb of exclusive managed memory cluster.fine-grained-resource-management.enabled: false job 1: state.backend.memory.share-scope: TASK_MANAGER state.backend.rocksdb.memory.managed: true job 2: state.backend.memory.share-scope: TASK_MANAGER state.backend.rocksdb.memory.managed: true job 3: state.backend.memory.share-scope: SLOT state.backend.rocksdb.memory.managed: true job 4: state.backend.memory.share-scope: TASK_MANAGER state.backend.rocksdb.memory.managed: false job 5: state.backend.memory.share-scope: TASK_MANAGER state.backend.rocksdb.memory.managed: false {code} Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with each other. Job 3 will only use exclusive slot memory (25mb per slot). Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with each other. Python code (or other consumers) will be able to use up to 25mb per slot in jobs 1,2,4,5. h2. Creating and sharing RocksDB objects Introduce sharedMemoryManager to TaskManager. Then, similarly to the current slot-wise sharing: - Memory manager manages OpaqueMemoryResource - Creation of Cache object is done from the backend code on the first call So flink-runtime doesn't have to depend on state backend. h2. Class loading and resolution RocksDB state backend is already a part of the distribution. However, if a job also includes it then classloader.resolve-order should be set to parent-first to prevent conflicts. h2. Lifecycle The cache object should be destroyed on TM termnation; job or task completion should NOT close it. h1. Testing One way to test that the same RocksDB cache is used is via RocksDB metrics. h1. Limitations - classloader.resolve-order=child-first is not supported - fine-grained-resource-management is not supported - only RocksDB will be able to use TM-wise shared memory; other consumers may be adjusted later cc: [~yunta], [~ym], [~liyu] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29158) Fix logging in DefaultCompletedCheckpointStore
Roman Khachatryan created FLINK-29158: - Summary: Fix logging in DefaultCompletedCheckpointStore Key: FLINK-29158 URL: https://issues.apache.org/jira/browse/FLINK-29158 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.15.2 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.3 See [https://github.com/apache/flink/pull/16582#discussion_r949214456] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29157) Clarify the contract between CompletedCheckpointStore and SharedStateRegistry
Roman Khachatryan created FLINK-29157: - Summary: Clarify the contract between CompletedCheckpointStore and SharedStateRegistry Key: FLINK-29157 URL: https://issues.apache.org/jira/browse/FLINK-29157 Project: Flink Issue Type: Technical Debt Components: Runtime / Checkpointing Affects Versions: 1.15.2, 1.16.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0, 1.15.3 After FLINK-24611, CompletedCheckpointStore is required to call SharedStateRegistry.unregisterUnusedState() on checkpoint subsumption and shutdown. Although it's not clear whether CompletedCheckpointStore is internal there are in fact external implementations (which weren't updated accordingly). After FLINK-25872, CompletedCheckpointStore also must call checkpointsCleaner.cleanSubsumedCheckpoints. Another issue with a custom implementation was using different java objects for state for CheckpointStore and SharedStateRegistry (after FLINK-24086). So it makes sense to: * clarify the contract (different in 1.15 and 1.16) * require using the same checkpoint objects by SharedStateRegistryFactory and CompletedCheckpointStore * mark the interface(s) as PublicEvolving -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28976) Changelog 1st materialization delayed unneccesarily
Roman Khachatryan created FLINK-28976: - Summary: Changelog 1st materialization delayed unneccesarily Key: FLINK-28976 URL: https://issues.apache.org/jira/browse/FLINK-28976 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.1, 1.16.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0, 1.15.2 In PeriodicMaterializationManager.start(), the 1st materialization is scheduled with a delay: materialization_interval + random_offset Here, random_offset is added to avoid thundering herd problem. The next materialization will be scheduled with a delay of only materialization_interval. That means that the 1st materialization will have to compact up to 2 times more state changes than the subsequent ones. Which in turn can cause FLINK--26590 or other problems. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28931) BlockingPartitionBenchmark doesn't compile
Roman Khachatryan created FLINK-28931: - Summary: BlockingPartitionBenchmark doesn't compile Key: FLINK-28931 URL: https://issues.apache.org/jira/browse/FLINK-28931 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.16.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan {code} 10:15:12 [ERROR] /home/jenkins/workspace/flink-master-benchmarks-java8/flink-benchmarks/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java:117:50: error: cannot find symbol {code} Caused by https://github.com/apache/flink/commit/9f5d0c48f198ff69a175f630832687ba02cf4c3e#diff-f72e79ebd747b6fde91988d65de9121a5907c97e4630cb1e30ab65601b4d9753R79 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28647) Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint
Roman Khachatryan created FLINK-28647: - Summary: Remove separate error handling and adjust documentation for CLAIM mode + RocksDB native savepoint Key: FLINK-28647 URL: https://issues.apache.org/jira/browse/FLINK-28647 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0 After FLINK-25872, checkpoint folder deletion is not performed as long as there is some state from that checkpoint used by other checkpoints. Therefore, the following changes could be reverted/adjusted: * FLINK-25745 e8bcbfd5a48fd8d3ca48ef7803867569214e0dbc Do not log exception * FLINK-25745 c1f5c5320150402fc0cb4fbf3a31f9a27b1e4d9a Document incremental savepoints in CLAIM mode limitation cc: [~Yanfei Lei] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28597) Empty checkpoint folders not deleted on job cancellation if their shared state is still in use
Roman Khachatryan created FLINK-28597: - Summary: Empty checkpoint folders not deleted on job cancellation if their shared state is still in use Key: FLINK-28597 URL: https://issues.apache.org/jira/browse/FLINK-28597 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0 After FLINK-25872, SharedStateRegistry registers all state handles, including private ones. Once the state isn't use AND the checkpoint is subsumed, it will actually be discarded. This is done to prevent premature deletion when recovering in CLAIM mode: 1. RocksDB native savepoint folder (shared state is stored in chk-xx folder so it might fail the deletion) 2. Initial non-changelog checkpoint when switching to changelog-based checkpoints (private state of the initial checkpoint might be included into later checkpoints and its deletion would invalidate them) Additionally, checkpoint folders are not deleted for a longer time which might be confusing. In case of a crash, more folders will remain. cc: [~Yanfei Lei], [~ym] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27571) Recognize "less is better" benchmarks in regression detection script
Roman Khachatryan created FLINK-27571: - Summary: Recognize "less is better" benchmarks in regression detection script Key: FLINK-27571 URL: https://issues.apache.org/jira/browse/FLINK-27571 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.16.0 Reporter: Roman Khachatryan Attachments: Screenshot_2022-05-09_10-33-11.png http://codespeed.dak8s.net:8000/timeline/#/?exe=5=schedulingDownstreamTasks.BATCH=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27556) Performance regression in checkpointSingleInput.UNALIGNED on 29.04.2022
Roman Khachatryan created FLINK-27556: - Summary: Performance regression in checkpointSingleInput.UNALIGNED on 29.04.2022 Key: FLINK-27556 URL: https://issues.apache.org/jira/browse/FLINK-27556 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Roman Khachatryan http://codespeed.dak8s.net:8000/timeline/#/?exe=1=checkpointSingleInput.UNALIGNED=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27555) Performance regression in schedulingDownstreamTasks on 02.05.2022
Roman Khachatryan created FLINK-27555: - Summary: Performance regression in schedulingDownstreamTasks on 02.05.2022 Key: FLINK-27555 URL: https://issues.apache.org/jira/browse/FLINK-27555 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Coordination Affects Versions: 1.16.0 Reporter: Roman Khachatryan http://codespeed.dak8s.net:8000/timeline/#/?exe=5=schedulingDownstreamTasks.BATCH=on=on=off=2=200 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27150) Improve error reporting in Flink UI
Roman Khachatryan created FLINK-27150: - Summary: Improve error reporting in Flink UI Key: FLINK-27150 URL: https://issues.apache.org/jira/browse/FLINK-27150 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.16.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0 - don't hide after a timeout (was: 4.5s) - expand the message box -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27149) KafkaSourceE2ECase.testScaleUp failed on AZP
Roman Khachatryan created FLINK-27149: - Summary: KafkaSourceE2ECase.testScaleUp failed on AZP Key: FLINK-27149 URL: https://issues.apache.org/jira/browse/FLINK-27149 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Tests Affects Versions: 1.16.0 Reporter: Roman Khachatryan Fix For: 1.16.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34404=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=15393 {code:java} [INFO] --- [INFO] T E S T S [INFO] --- [INFO] Running org.apache.flink.tests.util.kafka.KafkaSourceE2ECase [ERROR] Tests run: 16, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 174.171 s <<< FAILURE! - in org.apache.flink.tests.util.kafka.KafkaSourceE2ECase [ERROR] org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testScaleUp(TestEnvironment, DataStreamSourceExternalContext, CheckpointingMode)[1] Time elapsed: 2.7 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.restartFromSavepoint(SourceTestSuiteBase.java:343) at org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.testScaleUp(SourceTestSuiteBase.java:258) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at
[jira] [Created] (FLINK-27148) UnalignedCheckpointITCase fails on AZP
Roman Khachatryan created FLINK-27148: - Summary: UnalignedCheckpointITCase fails on AZP Key: FLINK-27148 URL: https://issues.apache.org/jira/browse/FLINK-27148 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Network Affects Versions: 1.15.0, 1.16.0 Reporter: Roman Khachatryan Fix For: 1.15.0, 1.16.0 [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=5812] [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=6018] {code} [ERROR] Tests run: 22, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 174.732 s <<< FAILURE! - in org.apache.flink.test.checkpointing.UnalignedCheckpointITCase [ERROR] UnalignedCheckpointITCase.execute Time elapsed: 6.408 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:184) at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:287) at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.Verifier$1.evaluate(Verifier.java:35) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) at
[jira] [Created] (FLINK-27144) Provide timeout details when calling FutureUtils.orTimeout
Roman Khachatryan created FLINK-27144: - Summary: Provide timeout details when calling FutureUtils.orTimeout Key: FLINK-27144 URL: https://issues.apache.org/jira/browse/FLINK-27144 Project: Flink Issue Type: Improvement Affects Versions: 1.14.4, 1.13.6, 1.15.0, 1.16.0 Reporter: Roman Khachatryan Fix For: 1.15.0, 1.16.0 There are two versions of FutureUtils.orTimeout() that use null as an error message when the timeout happens. They are used by (in 1.16): * DefaultScheduler.registerProducedPartitions * DeclarativeSlotPoolBridge.internalRequestNewSlot * CompletedOperationCache.closeAsync * TaskManagerRunner.onFatalError * RestClusterClient.getWebMonitorBaseUrl This makes it difficult to debug those timeouts, in particular during the shutdown. See [this|https://lists.apache.org/thread/5wxv2occohc6ky1g754n7o8b8ssjcqf5] thread for example. Replacing null with an actual message ease improve the debugging; the message could be made mandatory. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27132) CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint
Roman Khachatryan created FLINK-27132: - Summary: CheckpointResourcesCleanupRunner might discard shared state of the initial checkpoint Key: FLINK-27132 URL: https://issues.apache.org/jira/browse/FLINK-27132 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.15.0, 1.16.0 Reporter: Roman Khachatryan When considering the following case: # A job starts from a checkpoint in NO_CLAIM mode, with incremental checkpoints enabled # It produces some new checkpoints and subsumes the original one (not discarding shared state - before FLINK-24611 or after FLINK-26985) # Job terminates abruptly # The cleaner is started for that job # ZK doesn't have the initial checkpoint, so the store will load only the new checkpoints (created in 2). Shared state is registered # The store is shut down - discarding all the checkpoints and also any shared state In 6, if some checkpoint uses the initial state, it will also be discarded [~mapohl] could you please confirm this? cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27114) On JM restart, the information about the initial checkpoints can be lost
Roman Khachatryan created FLINK-27114: - Summary: On JM restart, the information about the initial checkpoints can be lost Key: FLINK-27114 URL: https://issues.apache.org/jira/browse/FLINK-27114 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.4, 1.15.0, 1.16.0 Reporter: Roman Khachatryan Fix For: 1.16.0, 1.14.5, 1.15.1 Scenario (1.14): # A job starts from an existing checkpoint 1, with incremental checkpoints enabled # Checkpoint 1 is loaded with discardOnSubsume=false by CheckpointCoordinator.restoreSavepoint # A new checkpoint 2 completes, it reuses some state from the initial checkpoint # At some point, checkpoint 1 is subsumed, but the state is not discarded (thanks to discardOnSubsume=false, ref counts stay 1) # JM crashes # JM restarts, loads the checkpoints 2..x from ZK (or other store) - discardOnSubsume=true (as deserialized from handles) # At some point, checkpoint 2 is subsumed and the initial shared state is not used anymore; because checkpoint 2 has discardOnSubsume=true, shared state will be erroneously discarded In 1.15, there were the following changes: # RestoreMode was added; only NO_CLAIM and LEGACY modes are affected # SharedStateRegistry was changed from refCounts to highest checkpoint ID # In step (7), state will not be discarded; however, because it's impossible to distinguish initial state from the state created by this job, the latter will not be discarded as well, leading to left-over state artifacts. The proposed solution is to store the initial checkpoint ID (in store such as ZK or in checkpoints) and adjust steps 6 or 7. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26968) Bump CopyOnWriteStateMap entry version before write
Roman Khachatryan created FLINK-26968: - Summary: Bump CopyOnWriteStateMap entry version before write Key: FLINK-26968 URL: https://issues.apache.org/jira/browse/FLINK-26968 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26967) Fix race condition in CopyOnWriteStateMap
Roman Khachatryan created FLINK-26967: - Summary: Fix race condition in CopyOnWriteStateMap Key: FLINK-26967 URL: https://issues.apache.org/jira/browse/FLINK-26967 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26966) Implement incremental checkpoints
Roman Khachatryan created FLINK-26966: - Summary: Implement incremental checkpoints Key: FLINK-26966 URL: https://issues.apache.org/jira/browse/FLINK-26966 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26965) Allow reuse of PeriodicMaterializationManager
Roman Khachatryan created FLINK-26965: - Summary: Allow reuse of PeriodicMaterializationManager Key: FLINK-26965 URL: https://issues.apache.org/jira/browse/FLINK-26965 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26964) Notify CheckpointStrategy about checkpoint completion/abortion
Roman Khachatryan created FLINK-26964: - Summary: Notify CheckpointStrategy about checkpoint completion/abortion Key: FLINK-26964 URL: https://issues.apache.org/jira/browse/FLINK-26964 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26963) Allow heap state backend creation customization
Roman Khachatryan created FLINK-26963: - Summary: Allow heap state backend creation customization Key: FLINK-26963 URL: https://issues.apache.org/jira/browse/FLINK-26963 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26956) AZP: don't log "[ERROR] Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError"
Roman Khachatryan created FLINK-26956: - Summary: AZP: don't log "[ERROR] Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError" Key: FLINK-26956 URL: https://issues.apache.org/jira/browse/FLINK-26956 Project: Flink Issue Type: Improvement Components: Test Infrastructure Affects Versions: 1.16.0 Reporter: Roman Khachatryan The message makes searching for real errors more difficult. Probably grep -v will suffice, but need to be careful with exit codes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26912) EventTimeWindowCheckpointingITCase.testTumblingTimeWindow failed on azure
Roman Khachatryan created FLINK-26912: - Summary: EventTimeWindowCheckpointingITCase.testTumblingTimeWindow failed on azure Key: FLINK-26912 URL: https://issues.apache.org/jira/browse/FLINK-26912 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Roman Khachatryan Fix For: 1.16.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=33856=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=13133 {code} 2022-03-28T21:19:57.3494606Z Mar 28 21:19:57 [ERROR] Tests run: 60, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 413.405 s <<< FAILURE! - in org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase 2022-03-28T21:19:57.3496371Z Mar 28 21:19:57 [ERROR] EventTimeWindowCheckpointingITCase.testTumblingTimeWindow Time elapsed: 21.757 s <<< FAILURE! 2022-03-28T21:19:57.3497179Z Mar 28 21:19:57 java.lang.AssertionError: Job completed with illegal application status: UNKNOWN. 2022-03-28T21:19:57.3497773Z Mar 28 21:19:57at org.junit.Assert.fail(Assert.java:89) 2022-03-28T21:19:57.3498549Z Mar 28 21:19:57at org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.testTumblingTimeWindow(EventTimeWindowCheckpointingITCase.java:350) 2022-03-28T21:19:57.3499831Z Mar 28 21:19:57at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-03-28T21:19:57.3501367Z Mar 28 21:19:57at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2022-03-28T21:19:57.3502820Z Mar 28 21:19:57at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-03-28T21:19:57.3503493Z Mar 28 21:19:57at java.lang.reflect.Method.invoke(Method.java:498) 2022-03-28T21:19:57.3504135Z Mar 28 21:19:57at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2022-03-28T21:19:57.3504854Z Mar 28 21:19:57at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2022-03-28T21:19:57.3505569Z Mar 28 21:19:57at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2022-03-28T21:19:57.3506256Z Mar 28 21:19:57at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2022-03-28T21:19:57.3506979Z Mar 28 21:19:57at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 2022-03-28T21:19:57.3508161Z Mar 28 21:19:57at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 2022-03-28T21:19:57.3509166Z Mar 28 21:19:57at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) 2022-03-28T21:19:57.3509812Z Mar 28 21:19:57at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 2022-03-28T21:19:57.3510607Z Mar 28 21:19:57at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) 2022-03-28T21:19:57.3511226Z Mar 28 21:19:57at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) 2022-03-28T21:19:57.3511889Z Mar 28 21:19:57at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) 2022-03-28T21:19:57.3512551Z Mar 28 21:19:57at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) 2022-03-28T21:19:57.3513209Z Mar 28 21:19:57at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) 2022-03-28T21:19:57.3513911Z Mar 28 21:19:57at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) 2022-03-28T21:19:57.3514562Z Mar 28 21:19:57at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 2022-03-28T21:19:57.3515168Z Mar 28 21:19:57at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 2022-03-28T21:19:57.3515774Z Mar 28 21:19:57at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) 2022-03-28T21:19:57.3516396Z Mar 28 21:19:57at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) 2022-03-28T21:19:57.3517018Z Mar 28 21:19:57at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) 2022-03-28T21:19:57.3517621Z Mar 28 21:19:57at org.junit.runners.ParentRunner.run(ParentRunner.java:413) 2022-03-28T21:19:57.3518185Z Mar 28 21:19:57at org.junit.runners.Suite.runChild(Suite.java:128) 2022-03-28T21:19:57.3518729Z Mar 28 21:19:57at org.junit.runners.Suite.runChild(Suite.java:27) 2022-03-28T21:19:57.3519282Z Mar 28 21:19:57at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) 2022-03-28T21:19:57.3519888Z Mar 28 21:19:57at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) 2022-03-28T21:19:57.3520621Z Mar 28 21:19:57at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) 2022-03-28T21:19:57.3521390Z Mar 28 21:19:57at
[jira] [Created] (FLINK-26853) HeapStateBackend ignores metadata updates in certain cases
Roman Khachatryan created FLINK-26853: - Summary: HeapStateBackend ignores metadata updates in certain cases Key: FLINK-26853 URL: https://issues.apache.org/jira/browse/FLINK-26853 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.14.4, 1.15.0, 1.16.0 Reporter: Roman Khachatryan On recovery, HeapRestoreOperation reads state handles one by one; * each handle contains metadata at the beginning; * the metadata is always read, but not actually used if a state with the corresponding name was already registered In a rare case of downscaling + multiple checkpoints with different metadata; this might lead to data being deserialized incorrectly (always using the initial metadata). It also prevents incremental checkpoints with schema evolution. On first access, however, the backend itself will update (merge) metadata; so that it doesn't affect new state updates. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26685) Investigate/improve tests stability when using InMemory Changelog implementation
Roman Khachatryan created FLINK-26685: - Summary: Investigate/improve tests stability when using InMemory Changelog implementation Key: FLINK-26685 URL: https://issues.apache.org/jira/browse/FLINK-26685 Project: Flink Issue Type: Improvement Components: Runtime / State Backends, Test Infrastructure Affects Versions: 1.15.0, 1.16.0 Reporter: Roman Khachatryan Fix For: 1.16.0 Large scale tests often fail when using InMemory Changelog, either because of excessive GC pressure, serialization, or exceeding memory limits. So far, it was fixed by using FS implementation on a case-by-case basis. Always using FS isn't straightforward. Investigate if there are any ways to uniformly use FS impl. or stabilize in-memory impl. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26636) EmbeddedMultiThreadDependencyTests test_add_python_file failed on azure
Roman Khachatryan created FLINK-26636: - Summary: EmbeddedMultiThreadDependencyTests test_add_python_file failed on azure Key: FLINK-26636 URL: https://issues.apache.org/jira/browse/FLINK-26636 Project: Flink Issue Type: Bug Reporter: Roman Khachatryan https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32981=logs=e92ecf6d-e207-5a42-7ff7-528ff0c5b259=40fc352e-9b4c-5fd8-363f-628f24b01ec2=28024 {code} 2022-03-14T04:11:09.7985367Z Mar 14 04:11:09 === FAILURES === 2022-03-14T04:11:09.7988965Z Mar 14 04:11:09 ___ EmbeddedMultiThreadDependencyTests.test_add_python_file 2022-03-14T04:11:09.7989557Z Mar 14 04:11:09 2022-03-14T04:11:09.7990098Z Mar 14 04:11:09 self = 2022-03-14T04:11:09.7990644Z Mar 14 04:11:09 2022-03-14T04:11:09.7991165Z Mar 14 04:11:09 def test_add_python_file(self): 2022-03-14T04:11:09.7991785Z Mar 14 04:11:09 python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) 2022-03-14T04:11:09.7992318Z Mar 14 04:11:09 os.mkdir(python_file_dir) 2022-03-14T04:11:09.7993113Z Mar 14 04:11:09 python_file_path = os.path.join(python_file_dir, "test_dependency_manage_lib.py") 2022-03-14T04:11:09.7994443Z Mar 14 04:11:09 with open(python_file_path, 'w') as f: 2022-03-14T04:11:09.7995251Z Mar 14 04:11:09 f.write("def add_two(a):\nraise Exception('This function should not be called!')") 2022-03-14T04:11:09.7997796Z Mar 14 04:11:09 self.t_env.add_python_file(python_file_path) 2022-03-14T04:11:09.7998592Z Mar 14 04:11:09 2022-03-14T04:11:09.7999298Z Mar 14 04:11:09 python_file_dir_with_higher_priority = os.path.join( 2022-03-14T04:11:09.8000146Z Mar 14 04:11:09 self.tempdir, "python_file_dir_" + str(uuid.uuid4())) 2022-03-14T04:11:09.8001027Z Mar 14 04:11:09 os.mkdir(python_file_dir_with_higher_priority) 2022-03-14T04:11:09.8001891Z Mar 14 04:11:09 python_file_path_higher_priority = os.path.join(python_file_dir_with_higher_priority, 2022-03-14T04:11:09.8002526Z Mar 14 04:11:09 "test_dependency_manage_lib.py") 2022-03-14T04:11:09.8003945Z Mar 14 04:11:09 with open(python_file_path_higher_priority, 'w') as f: 2022-03-14T04:11:09.8004664Z Mar 14 04:11:09 f.write("def add_two(a):\nreturn a + 2") 2022-03-14T04:11:09.8005208Z Mar 14 04:11:09 self.t_env.add_python_file(python_file_path_higher_priority) 2022-03-14T04:11:09.8005648Z Mar 14 04:11:09 2022-03-14T04:11:09.8005992Z Mar 14 04:11:09 def plus_two(i): 2022-03-14T04:11:09.8006430Z Mar 14 04:11:09 from test_dependency_manage_lib import add_two 2022-03-14T04:11:09.8006857Z Mar 14 04:11:09 return add_two(i) 2022-03-14T04:11:09.8007204Z Mar 14 04:11:09 2022-03-14T04:11:09.8007605Z Mar 14 04:11:09 self.t_env.create_temporary_system_function( 2022-03-14T04:11:09.8008206Z Mar 14 04:11:09 "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT())) 2022-03-14T04:11:09.8008742Z Mar 14 04:11:09 table_sink = source_sink_utils.TestAppendSink( 2022-03-14T04:11:09.8009465Z Mar 14 04:11:09 ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()]) 2022-03-14T04:11:09.8009993Z Mar 14 04:11:09 self.t_env.register_table_sink("Results", table_sink) 2022-03-14T04:11:09.8010763Z Mar 14 04:11:09 t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b']) 2022-03-14T04:11:09.8011324Z Mar 14 04:11:09 > t.select(expr.call("add_two", t.a), t.a).execute_insert("Results").wait() 2022-03-14T04:11:09.8011779Z Mar 14 04:11:09 2022-03-14T04:11:09.8012176Z Mar 14 04:11:09 pyflink/table/tests/test_dependency.py:63: 2022-03-14T04:11:09.8012677Z Mar 14 04:11:09 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2022-03-14T04:11:09.8013574Z Mar 14 04:11:09 pyflink/table/table_result.py:76: in wait 2022-03-14T04:11:09.8014041Z Mar 14 04:11:09 get_method(self._j_table_result, "await")() 2022-03-14T04:11:09.8014843Z Mar 14 04:11:09 .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in __call__ 2022-03-14T04:11:09.8015387Z Mar 14 04:11:09 return_value = get_return_value( 2022-03-14T04:11:09.8015843Z Mar 14 04:11:09 pyflink/util/exceptions.py:146: in deco 2022-03-14T04:11:09.8016267Z Mar 14 04:11:09 return f(*a, **kw) 2022-03-14T04:11:09.8016720Z Mar 14 04:11:09 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2022-03-14T04:11:09.8017116Z Mar 14 04:11:09 2022-03-14T04:11:09.8017626Z Mar 14 04:11:09 answer = 'xro2722' 2022-03-14T04:11:09.8018127Z Mar 14 04:11:09 gateway_client = 2022-03-14T04:11:09.8018812Z Mar 14 04:11:09
[jira] [Created] (FLINK-26635) KafkaSourceE2ECase.testMultipleSplits failed on azure with Correlation id for response does not match request
Roman Khachatryan created FLINK-26635: - Summary: KafkaSourceE2ECase.testMultipleSplits failed on azure with Correlation id for response does not match request Key: FLINK-26635 URL: https://issues.apache.org/jira/browse/FLINK-26635 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.15.0 Reporter: Roman Khachatryan https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32981=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=14703 {code} 2022-03-14T03:35:34.8710354Z Mar 14 03:35:34 [ERROR] Tests run: 16, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 184.49 s <<< FAILURE! - in org.apache.flink.tests.util.kafka.KafkaSourceE2ECase 2022-03-14T03:35:34.8881150Z Mar 14 03:35:34 [ERROR] org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testMultipleSplits(TestEnvironment, DataStreamSourceExternalContext, CheckpointingMode)[1] Time elapsed: 3.769 s <<< ERROR! 2022-03-14T03:35:34.8882315Z Mar 14 03:35:34 java.lang.RuntimeException: Failed to fetch next result 2022-03-14T03:35:34.8886537Z Mar 14 03:35:34at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 2022-03-14T03:35:34.8887677Z Mar 14 03:35:34at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 2022-03-14T03:35:34.672Z Mar 14 03:35:34at org.apache.flink.connector.testframe.utils.CollectIteratorAssert.compareWithExactlyOnceSemantic(CollectIteratorAssert.java:116) 2022-03-14T03:35:34.8889715Z Mar 14 03:35:34at org.apache.flink.connector.testframe.utils.CollectIteratorAssert.matchesRecordsFromSource(CollectIteratorAssert.java:71) 2022-03-14T03:35:34.8890700Z Mar 14 03:35:34at org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.checkResultWithSemantic(SourceTestSuiteBase.java:769) 2022-03-14T03:35:34.8891663Z Mar 14 03:35:34at org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.testMultipleSplits(SourceTestSuiteBase.java:218) 2022-03-14T03:35:34.8892432Z Mar 14 03:35:34at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-03-14T03:35:34.8893159Z Mar 14 03:35:34at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2022-03-14T03:35:34.8893940Z Mar 14 03:35:34at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-03-14T03:35:34.8894691Z Mar 14 03:35:34at java.lang.reflect.Method.invoke(Method.java:498) 2022-03-14T03:35:34.8895694Z Mar 14 03:35:34at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) 2022-03-14T03:35:34.8896532Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) 2022-03-14T03:35:34.8897443Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) 2022-03-14T03:35:34.8898353Z Mar 14 03:35:34at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) 2022-03-14T03:35:34.8899200Z Mar 14 03:35:34at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) 2022-03-14T03:35:34.8901355Z Mar 14 03:35:34at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) 2022-03-14T03:35:34.8902346Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) 2022-03-14T03:35:34.8903536Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) 2022-03-14T03:35:34.8904460Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) 2022-03-14T03:35:34.8905743Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) 2022-03-14T03:35:34.8906674Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) 2022-03-14T03:35:34.8907606Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) 2022-03-14T03:35:34.8908447Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) 2022-03-14T03:35:34.8909283Z Mar 14 03:35:34at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) 2022-03-14T03:35:34.8910366Z Mar 14 03:35:34at
[jira] [Created] (FLINK-26632) JobManagerHAProcessFailureRecoveryITCase failed due to JVM exits with code 239
Roman Khachatryan created FLINK-26632: - Summary: JobManagerHAProcessFailureRecoveryITCase failed due to JVM exits with code 239 Key: FLINK-26632 URL: https://issues.apache.org/jira/browse/FLINK-26632 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.15.0 Reporter: Roman Khachatryan [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32979=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=5486] {code} Mar 14 02:32:25 [ERROR] Error occurred in starting fork, check output in log Mar 14 02:32:25 [ERROR] Process Exit Code: 239 Mar 14 02:32:25 [ERROR] Crashed tests: Mar 14 02:32:25 [ERROR] org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase Mar 14 02:32:25 [ERROR] at org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:669) Mar 14 02:32:25 [ERROR] at org.apache.maven.plugin.surefire.booterclient.ForkStarter.access$600(ForkStarter.java:115) Mar 14 02:32:25 [ERROR] at org.apache.maven.plugin.surefire.booterclient.ForkStarter$2.call(ForkStarter.java:444) Mar 14 02:32:25 [ERROR] at org.apache.maven.plugin.surefire.booterclient.ForkStarter$2.call(ForkStarter.java:420) Mar 14 02:32:25 [ERROR] at java.util.concurrent.FutureTask.run(FutureTask.java:266) Mar 14 02:32:25 [ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) Mar 14 02:32:25 [ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Mar 14 02:32:25 [ERROR] at java.lang.Thread.run(Thread.java:748) Mar 14 02:32:25 [ERROR] -> [Help 1] {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26615) BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout fails on azure
Roman Khachatryan created FLINK-26615: - Summary: BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout fails on azure Key: FLINK-26615 URL: https://issues.apache.org/jira/browse/FLINK-26615 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Tests Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32896=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24724] {code} [ERROR] Tests run: 10, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.103 s <<< FAILURE! - in org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest [ERROR] org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout Time elapsed: 0.042 s <<< FAILURE! java.lang.AssertionError: expected:<[0]> but was:<[]> at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.failNotEquals(Assert.java:835) at org.junit.Assert.assertEquals(Assert.java:120) at org.junit.Assert.assertEquals(Assert.java:146) at org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.testRetryOnTimeout(BatchingStateChangeUploadSchedulerTest.java:240) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrato r.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrato r.java:88) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26592) [Changelog] Deadlock in FsStateChangelogWriter
Roman Khachatryan created FLINK-26592: - Summary: [Changelog] Deadlock in FsStateChangelogWriter Key: FLINK-26592 URL: https://issues.apache.org/jira/browse/FLINK-26592 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0 The issue occurs when sizes of buffers are set to minimum (e.g. 1 byte). Task thread tries to update state -> schedules to upload changes -> waits for capacity. Upload threads do release capacity on upload completion; however, they are unable to send back the results because the Writer lock is taken; therefore, they're unable to proceed with the next uploads. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26591) Compilation fails due to RawToBinaryCastRule
Roman Khachatryan created FLINK-26591: - Summary: Compilation fails due to RawToBinaryCastRule Key: FLINK-26591 URL: https://issues.apache.org/jira/browse/FLINK-26591 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32857=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=6288 {code} 2022-03-10T15:15:36.8803533Z [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on proj*ect flink-table-planner_2.12: Compilation failure 2022-03-10T15:15:36.8805068Z [ERROR] /__w/3/s/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RawTo BinaryCastRule.java:[46,5] method does not override or implement a method from a supertype {code} cc: [~slinkydeveloper] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26590) Triggered checkpoints can be delayed by discarding shared state
Roman Khachatryan created FLINK-26590: - Summary: Triggered checkpoints can be delayed by discarding shared state Key: FLINK-26590 URL: https://issues.apache.org/jira/browse/FLINK-26590 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.15.0, 1.14.3 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.16.0 Quick note: CheckpointCleaner is not involved here. When a checkpoint is subsumed, SharedStateRegistry schedules its unused shared state for async deletion. It uses common IO pool for this and adds a Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete) When a checkpoint is started, CheckpointCoordinator uses the same thread pool to initialize the location for it. (see CheckpointCoordinator.initializeCheckpoint) The thread pool is of fixed size [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size]; by default it's the number of CPU cores) and uses FIFO queue for tasks. When there is a spike in state deletion, the next checkpoint is delayed waiting for an available IO thread. Back-pressure seems reasonable here (similar to CheckpointCleaner); however, this shared state deletion could be spread across multiple subsequent checkpoints, not neccesarily the next one. I believe the issue is an pre-existing one; but it particularly affects changelog state backend, because 1) such spikes are likely there; 2) workloads are latency sensitive. In the tests, checkpoint duration grows from seconds to minutes immediately after the materialization. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26485) [Changelog] State not discarded after multiple retries
Roman Khachatryan created FLINK-26485: - Summary: [Changelog] State not discarded after multiple retries Key: FLINK-26485 URL: https://issues.apache.org/jira/browse/FLINK-26485 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26455) [Changelog] Materialization interleaved with task cancellation can fail the job
Roman Khachatryan created FLINK-26455: - Summary: [Changelog] Materialization interleaved with task cancellation can fail the job Key: FLINK-26455 URL: https://issues.apache.org/jira/browse/FLINK-26455 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26396) [Changelog] Upload is not failed even if all attempts timeout
Roman Khachatryan created FLINK-26396: - Summary: [Changelog] Upload is not failed even if all attempts timeout Key: FLINK-26396 URL: https://issues.apache.org/jira/browse/FLINK-26396 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26372) Allow Changelog Storage configuration per program
Roman Khachatryan created FLINK-26372: - Summary: Allow Changelog Storage configuration per program Key: FLINK-26372 URL: https://issues.apache.org/jira/browse/FLINK-26372 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration, Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.16.0 It's currently possible to override state.backend.changelog.enabled per job, but it's not possible to override Changelog storage (i.e. writer) settings. There should be 1) an API and 2) runtime support. See this [discussion|https://github.com/apache/flink/pull/16341#discussion_r663749681] and the corresponding [TODO|https://github.com/apache/flink/pull/16341/files#diff-2c21555dcab689ec27c0ab981852a2bfa787695fb2fe04b24c22b89c63d98b73R680]. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26306) Triggered checkpoints can be delayed by discarding shared state
Roman Khachatryan created FLINK-26306: - Summary: Triggered checkpoints can be delayed by discarding shared state Key: FLINK-26306 URL: https://issues.apache.org/jira/browse/FLINK-26306 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.3, 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 Quick note: CheckpointCleaner is not involved here. When a checkpoint is subsumed, SharedStateRegistry schedules its unused shared state for async deletion. It uses common IO pool for this and adds a Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete) When a checkpoint is started, CheckpointCoordinator uses the same thread pool to initialize the location for it. (see CheckpointCoordinator.initializeCheckpoint) The thread pool is of fixed size [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size]; by default it's the number of CPU cores) and uses FIFO queue for tasks. When there is a spike in state deletion, the next checkpoint is delayed waiting for an available IO thread. I believe the issue is an old one. But it particularly affects changelog state backend, because 1) such spikes are likely; 2) workloads are latency sensitive. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26255) SplitAggregateITCase.testAggWithJoin failed on azure
Roman Khachatryan created FLINK-26255: - Summary: SplitAggregateITCase.testAggWithJoin failed on azure Key: FLINK-26255 URL: https://issues.apache.org/jira/browse/FLINK-26255 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31850=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=10497] {code:java} [ERROR] Tests run: 64, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 700.545 s <<< FAILURE! - in org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase [ERROR] SplitAggregateITCase.testAggWithJoin Time elapsed: 601.77 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniCl usterJobClient.java:141) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandle r.java:259) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java :93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadi ngUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextCla ssLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFut ureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26231) [Changelog] Incorrect MaterializationID passed to ChangelogStateBackendHandleImpl
Roman Khachatryan created FLINK-26231: - Summary: [Changelog] Incorrect MaterializationID passed to ChangelogStateBackendHandleImpl Key: FLINK-26231 URL: https://issues.apache.org/jira/browse/FLINK-26231 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 In ChangelogStateBackendHandleImpl constructor, materializationID and persistedSizeOfThisCheckpoint are mixed up. cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26198) ArchitectureTest fails on AZP (table.api.StatementSet)
Roman Khachatryan created FLINK-26198: - Summary: ArchitectureTest fails on AZP (table.api.StatementSet) Key: FLINK-26198 URL: https://issues.apache.org/jira/browse/FLINK-26198 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Roman Khachatryan https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31681=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=26849 {code} [INFO] Running org.apache.flink.architecture.rules.ApiAnnotationRules [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 48.583 s <<< FAILURE! - in org.apache.flink.architecture.rules.ApiAnnotationRules [ERROR] ApiAnnotationRules.PUBLIC_EVOLVING_API_METHODS_USE_ONLY_PUBLIC_EVOLVING_API_TYPES Time elapsed: 0.282 s <<< FAILURE! java.lang.AssertionError: Architecture Violation [Priority: MEDIUM] - Rule 'Return and argument types of methods annotated with @PublicEvolving must be annotated with @Public(Evolving).' was violated (1 times): org.apache.flink.table.api.StatementSet.compilePlan(): Returned leaf type org.apache.flink.table.api.CompiledPlan does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26165) SavepointFormatITCase fails on azure
Roman Khachatryan created FLINK-26165: - Summary: SavepointFormatITCase fails on azure Key: FLINK-26165 URL: https://issues.apache.org/jira/browse/FLINK-26165 Project: Flink Issue Type: Bug Reporter: Roman Khachatryan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26154) SavepointFormatITCase fails on azure
Roman Khachatryan created FLINK-26154: - Summary: SavepointFormatITCase fails on azure Key: FLINK-26154 URL: https://issues.apache.org/jira/browse/FLINK-26154 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Tests Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 Originally reported in FLINK-26144. [https://dev.azure.com/mapohl/flink/_build/results?buildId=738=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=9a028d19-6c4b-5a4e-d378-03fca149d0b1=6340] {code} Feb 15 01:26:50 [ERROR] Tests run: 16, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 591.027 s <<< FAILURE! - in org.apache.flink.test.checkpointing.SavepointFormatITCase Feb 15 01:26:50 [ERROR] org.apache.flink.test.checkpointing.SavepointFormatITCase.testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(SavepointFormatType, StateBackendConfig)[2] Time elapsed: 261.901 s <<< ERROR! Feb 15 01:26:50 java.util.concurrent.TimeoutException: Condition was not met in given timeout. Feb 15 01:26:50 at org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:166) Feb 15 01:26:50 at org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:144) Feb 15 01:26:50 at org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:136) Feb 15 01:26:50 at org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:210) Feb 15 01:26:50 at org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:184) Feb 15 01:26:50 at org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:172) Feb 15 01:26:50 at org.apache.flink.test.checkpointing.SavepointFormatITCase.relocateAndVerify(SavepointFormatITCase.java:306) Feb 15 01:26:50 at org.apache.flink.test.checkpointing.SavepointFormatITCase.testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(SavepointFormatITCase.java:260) Feb 15 01:26:50 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Feb 15 01:26:50 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Feb 15 01:26:50 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Feb 15 01:26:50 at java.lang.reflect.Method.invoke(Method.java:498) Feb 15 01:26:50 at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) Feb 15 01:26:50 at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) Feb 15 01:26:50 at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) Feb 15 01:26:50 at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) Feb 15 01:26:50 at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26144) SavepointFormatITCase did NOT fail with changelog.enabled randomized
Roman Khachatryan created FLINK-26144: - Summary: SavepointFormatITCase did NOT fail with changelog.enabled randomized Key: FLINK-26144 URL: https://issues.apache.org/jira/browse/FLINK-26144 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 The test vaildates corrects types of state handles created by savepoint. For NATIVE savepoints, it expects IncrementalRemoteKeyedStateHandle and KeyGroupsStateHandle. However, with changelog those will be wrapped into ChangelogStateBackendHandle and the test fails. It can be refactored to account for changelog. cc: [~dwysakowicz], Another issue is that it does NOT fail on master where checkpointing.changelog=random in pom.xml which should call {code:java} randomize(conf, ENABLE_STATE_CHANGE_LOG, true, false);{code} If I leave only a single "true" option to randomize(), it does fail. If confirmed, I'll open a new ticket for that. cc: [~arvid] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26092) JsonAggregationFunctionsITCase fails with NPE when using RocksDB
Roman Khachatryan created FLINK-26092: - Summary: JsonAggregationFunctionsITCase fails with NPE when using RocksDB Key: FLINK-26092 URL: https://issues.apache.org/jira/browse/FLINK-26092 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Tests Affects Versions: 1.14.3, 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Whith RocksDB backend chosen manually (instead of Heap; e.g. by altering mini-cluster configuration in BuiltInAggregateFunctionTestBase); the test fails with NPE. Not sure whether it's a RocksDB issue, a test issue, or not an issue at all. The current Changelog backend behavior mimics RocksDB, and therefore enabling it with materialization fails the test too (Changelog + Heap). {code} java.lang.RuntimeException: Could not collect results at org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.materializeResult(BuiltInAggregateFunctionTestBase.java:169) at org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.assertRows(BuiltInAggregateFunctionTestBase.java:133) at org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase$SuccessItem.execute(BuiltInAggregateFunctionTestBase.java:279) at org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.testFunction(BuiltInAggregateFunctionTestBase.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:216) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.table.planner.functions.BuiltInAggregateFunctionTestBase.materializeResult(BuiltInAggregateFunctionTestBase.java:150) ... 38 more Caused by: java.io.IOException: Failed to fetch job execution result at
[jira] [Created] (FLINK-26079) Disallow combination of Changelog backend and CLAIM restore mode
Roman Khachatryan created FLINK-26079: - Summary: Disallow combination of Changelog backend and CLAIM restore mode Key: FLINK-26079 URL: https://issues.apache.org/jira/browse/FLINK-26079 Project: Flink Issue Type: Bug Components: Runtime / Configuration, Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.15.0 Extracted from FLINK-25872. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26063) [Changelog] Incorrect key group logged for PQ.poll and remove
Roman Khachatryan created FLINK-26063: - Summary: [Changelog] Incorrect key group logged for PQ.poll and remove Key: FLINK-26063 URL: https://issues.apache.org/jira/browse/FLINK-26063 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 Key group is logged so that state changes can be re-distributed or shuffled. It is currently obtained from keyContext during poll() and remove() operations. However, keyContext is not updated when dequeing processing time timers. The impact is relatively small for remove(): in the worst case, the operation will be ignored. poll() should probably be replaced with remove() anyways - see FLINK-26062. One way to solve this problem is to extract key group from the polled element - if it is a timer. cc: [~masteryhx], [~ym], [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26062) [Changelog] Non-deterministic recovery of PriorityQueue states
Roman Khachatryan created FLINK-26062: - Summary: [Changelog] Non-deterministic recovery of PriorityQueue states Key: FLINK-26062 URL: https://issues.apache.org/jira/browse/FLINK-26062 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 Currently, InternalPriorityQueue.poll() is logged as a separate operation, without specifying the element that has been polled. On recovery, this recorded poll() is replayed. However, this is not deterministic because the order of PQ elements with equal priorityis not specified. For example, TimerHeapInternalTimer only compares timestamps, which are often equal. This results in polling timers from queue in wrong order => dropping timers => and not firing timers. ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow fails with materialization enabled and using heap state backend (both in-memory and fs-based implementations). Proposed solution is to replace poll with remove operation (which is based on equality). cc: [~masteryhx], [~ym], [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26019) Changelogged PriorityQueue elements recovered out-of-order
Roman Khachatryan created FLINK-26019: - Summary: Changelogged PriorityQueue elements recovered out-of-order Key: FLINK-26019 URL: https://issues.apache.org/jira/browse/FLINK-26019 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 StateChangeFormat is the class responsible for writing out changelog data. Each chunk of data is sorted by: logId -> sequenceNumber -> keyGroup. Sorting by sequenceNumber preserves temporal order. Sorting by keyGroup a) puts metadata (group -1) at the beginning and b) allows to write KG only once. However, the assumption that the order of changes across groups currently doesn't hold: poll operation of InternalPriorityQueue may affect any group (the smaller item across groups so far will be polled). This results in wrong processing time timers being removed on recovery in ProcessingTimeWindowCheckpointingITCase#testAggregatingSlidingProcessingTimeWindow One way to solve this probelm is to simply disable KG-sorting and grouping (only output metadata at the beginning). The other one is to associate polled element with the correct key group while logging changes. Both ways should work with re-scaling. cc: [~masteryhx], [~ym], [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25992) JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership fails on azure
Roman Khachatryan created FLINK-25992: - Summary: JobDispatcherITCase..testRecoverFromCheckpointAfterLosingAndRegainingLeadership fails on azure Key: FLINK-25992 URL: https://issues.apache.org/jira/browse/FLINK-25992 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30871=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9154 {code} 19:41:35,515 [flink-akka.actor.default-dispatcher-9] WARN org.apache.flink.runtime.taskmanager.Task[] - jobVertex (1/1)#0 (7efdea21f5f95490e02117063ce8a314) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: Error while notify checkpoint ABORT. at org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1457) at org.apache.flink.runtime.taskmanager.Task.notifyCheckpointAborted(Task.java:1407) at org.apache.flink.runtime.taskexecutor.TaskExecutor.abortCheckpoint(TaskExecutor.java:1021) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.lang.UnsupportedOperationException: notifyCheckpointAbortAsync not supported by org.apache.flink.runtime.dispatcher.JobDispatcherITCase$AtLeastOneCheckpointInvokable at org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable.notifyCheckpointAbortAsync(AbstractInvokable.java:205) at org.apache.flink.runtime.taskmanager.Task.notifyCheckpoint(Task.java:1430) ... 31 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25987) IllegalArgumentException thrown from FsStateChangelogWriter.truncate
Roman Khachatryan created FLINK-25987: - Summary: IllegalArgumentException thrown from FsStateChangelogWriter.truncate Key: FLINK-25987 URL: https://issues.apache.org/jira/browse/FLINK-25987 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 {code} java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.changelog.fs.FsStateChangelogWriter.truncate(FsStateChangelogWriter.java:278) at org.apache.flink.state.changelog.ChangelogKeyedStateBackend.updateChangelogSnapshotState(ChangelogKeyedStateBackend.java:702) at org.apache.flink.state.changelog.PeriodicMaterializationManager.lambda$null$2(PeriodicMaterializationManager.java:163) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750){code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25871) "License Check failed" on AZP
Roman Khachatryan created FLINK-25871: - Summary: "License Check failed" on AZP Key: FLINK-25871 URL: https://issues.apache.org/jira/browse/FLINK-25871 Project: Flink Issue Type: Bug Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30405=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=25983] {code} 2022-01-28T16:00:43.2845037Z Invoking mvn with 'mvn -Dmaven.repo.local=/__w/1/.m2/repository -Dmaven.wagon.http.pool=false -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn --no-snapshot-updates -B -Dhadoop.version=2.8.5 -Dinclude_hadoop_aws -Dscala-2.12 --settings /__w/1/s/tools/ci/google-mirror-settings.xml exec:java -Dexec.mainClass=org.apache.flink.tools.ci.licensecheck.LicenseChecker -Dexec.args="/tmp/clean_compile.out /__w/1/s /tmp/flink-validation-deployment"' 2022-01-28T16:00:44.2455332Z [INFO] Scanning for projects... 2022-01-28T16:00:45.4125838Z [INFO] 2022-01-28T16:00:45.4127042Z [INFO] 2022-01-28T16:00:45.4134357Z [INFO] Building Flink : Tools : CI : Java 1.15-SNAPSHOT 2022-01-28T16:00:45.4135024Z [INFO] 2022-01-28T16:00:45.6475421Z [INFO] 2022-01-28T16:00:45.6482266Z [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ java-ci-tools --- 2022-01-28T16:00:46.4578710Z 16:00:46,454 WARN org.apache.flink.tools.ci.licensecheck.LicenseChecker[] - THIS UTILITY IS ONLY CHECKING FOR COMMON LICENSING MISTAKES. A MANUAL CHECK OF THE NOTICE FILES, DEPLOYED ARTIFACTS, ETC. IS STILL NEEDED! 2022-01-28T16:00:46.4654764Z 16:00:46,464 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Loaded 21 items from resource modules-skipping-deployment.modulelist 2022-01-28T16:00:46.4664700Z 16:00:46,465 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Loaded 7 items from resource modules-defining-excess-dependencies.modulelist 2022-01-28T16:00:50.3720798Z 16:00:50,370 INFO org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Extracted 45 modules with a total of 649 dependencies 2022-01-28T16:00:51.8055495Z 16:00:51,804 INFO org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Found 43 NOTICE files to check 2022-01-28T16:00:51.9879879Z 16:00:51,984 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency org.jodd:jodd-core:3.5.2 is mentioned in NOTICE file /__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE, but was not mentioned by the build output as a bundled dependency 2022-01-28T16:00:51.9886753Z 16:00:51,985 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency org.apache.hive:hive-storage-api:2.7.0 is mentioned in NOTICE file /__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE, but was not mentioned by the build output as a bundled dependency 2022-01-28T16:00:51.9913405Z 16:00:51,985 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency org.objenesis:objenesis:2.1 is mentioned in NOTICE file /__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE, but was not mentioned by the build output as a bundled dependency 2022-01-28T16:00:51.9929357Z 16:00:51,986 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency org.apache.hive.shims:hive-shims-common:3.1.2 is mentioned in NOTICE file /__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE, but was not mentioned by the build output as a bundled dependency 2022-01-28T16:00:51.9932281Z 16:00:51,986 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency org.apache.hive:hive-common:3.1.2 is mentioned in NOTICE file /__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE, but was not mentioned by the build output as a bundled dependency 2022-01-28T16:00:51.9934561Z 16:00:51,986 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency org.apache.hive:hive-serde:3.1.2 is mentioned in NOTICE file /__w/1/s/flink-connectors/flink-sql-connector-hive-3.1.2/src/main/resources/META-INF/NOTICE, but was not mentioned by the build output as a bundled dependency 2022-01-28T16:00:51.9936752Z 16:00:51,986 DEBUG org.apache.flink.tools.ci.licensecheck.NoticeFileChecker [] - Dependency
[jira] [Created] (FLINK-25867) [ZH] Add ChangelogBackend documentation
Roman Khachatryan created FLINK-25867: - Summary: [ZH] Add ChangelogBackend documentation Key: FLINK-25867 URL: https://issues.apache.org/jira/browse/FLINK-25867 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 Currently, changelog backend is hidden from users documentation-wise. Once the feature is ready, the following needs to be documented: * General description (page [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/] ) * Configuration (page [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/] - StateChangelogOptions, FsStateChangelogOptions) * Uploader metrics (page [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/] , see FLINK-23486) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25850) Consider notifying nested state backend about checkpoint abortion
Roman Khachatryan created FLINK-25850: - Summary: Consider notifying nested state backend about checkpoint abortion Key: FLINK-25850 URL: https://issues.apache.org/jira/browse/FLINK-25850 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 The notification is optional, but some backends might do GC upon receiving it. {code} These notifications are "best effort", meaning they can sometimes be skipped. This method is very rarely necessary to implement. {code} The usefulness is also limited by: - low probability of notification reaching backend because of the difference in intervals and cleanup on checkpoint completion - low probability of backends making good use of it because it's delivered after snapshot is done; and backends must be resilient to missing notifications There is added complexity and risk (such as FLINK-25816). Probably, complexity can be eliminated by extracting some Notifier class from ChangelogStateBackend. cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25842) [v2] FLIP-158: Generalized incremental checkpoints
Roman Khachatryan created FLINK-25842: - Summary: [v2] FLIP-158: Generalized incremental checkpoints Key: FLINK-25842 URL: https://issues.apache.org/jira/browse/FLINK-25842 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.16.0 Umbrella ticket for the 2nd iteration of [FLIP-158: Generalized incremental checkpoints|https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25825) MySqlCatalogITCase fails on azure
Roman Khachatryan created FLINK-25825: - Summary: MySqlCatalogITCase fails on azure Key: FLINK-25825 URL: https://issues.apache.org/jira/browse/FLINK-25825 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30189=logs=e9af9cde-9a65-5281-a58e-2c8511d36983=c520d2c3-4d17-51f1-813b-4b0b74a0c307=13677 {code} 2022-01-26T06:04:42.8019913Z Jan 26 06:04:42 [ERROR] org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase.testFullPath Time elapsed: 2.166 *s <<< FAILURE! 2022-01-26T06:04:42.8025522Z Jan 26 06:04:42 java.lang.AssertionError: expected: java.util.ArrayList<[+I[1, -1, 1, null, true, null, hello, 2021-0 8-04, 2021-08-04T01:54:16, -1, 1, -1.0, 1.0, enum2, -9.1, 9.1, -1, 1, -1, 1, \{"k1": "v1"}, null, col_longtext, null, -1, 1, col_mediumtext, -99, 9 9, -1.0, 1.0, set_ele1, -1, 1, col_text, 10:32:34, 2021-08-04T01:54:16, col_tinytext, -1, 1, null, col_varchar, 2021-08-04T01:54:16.463, 09:33:43, 2021-08-04T01:54:16.463, null], +I[2, -1, 1, null, true, null, hello, 2021-08-04, 2021-08-04T01:53:19, -1, 1, -1.0, 1.0, enum2, -9.1, 9.1, -1, 1, -1, 1, \{"k1": "v1"}, null, col_longtext, null, -1, 1, col_mediumtext, -99, 99, -1.0, 1.0, set_ele1,set_ele12, -1, 1, col_text, 10:32:34, 2021-08- 04T01:53:19, col_tinytext, -1, 1, null, col_varchar, 2021-08-04T01:53:19.098, 09:33:43, 2021-08-04T01:53:19.098, null]]> but was: java.util.ArrayL ist<[+I[1, -1, 1, null, true, null, hello, 2021-08-04, 2021-08-04T01:54:16, -1, 1, -1.0, 1.0, enum2, -9.1, 9.1, -1, 1, -1, 1, \{"k1": "v1"}, null, col_longtext, null, -1, 1, col_mediumtext, -99, 99, -1.0, 1.0, set_ele1, -1, 1, col_text, 10:32:34, 2021-08-04T01:54:16, col_tinytext, -1, 1, null , col_varchar, 2021-08-04T01:54:16.463, 09:33:43, 2021-08-04T01:54:16.463, null], +I[2, -1, 1, null, true, null, hello, 2021-08-04, 2021-08-04T01: 53:19, -1, 1, -1.0, 1.0, enum2, -9.1, 9.1, -1, 1, -1, 1, \{"k1": "v1"}, null, col_longtext, null, -1, 1, col_mediumtext, -99, 99, -1.0, 1.0, set_el e1,set_ele12, -1, 1, col_text, 10:32:34, 2021-08-04T01:53:19, col_tinytext, -1, 1, null, col_varchar, 2021-08-04T01:53:19.098, 09:33:43, 2021-08-0 4T01:53:19.098, null]]> 2022-01-26T06:04:42.8029336Z Jan 26 06:04:42 at org.junit.Assert.fail(Assert.java:89) 2022-01-26T06:04:42.8029824Z Jan 26 06:04:42 at org.junit.Assert.failNotEquals(Assert.java:835) 2022-01-26T06:04:42.8030319Z Jan 26 06:04:42 at org.junit.Assert.assertEquals(Assert.java:120) 2022-01-26T06:04:42.8030815Z Jan 26 06:04:42 at org.junit.Assert.assertEquals(Assert.java:146) 2022-01-26T06:04:42.8031419Z Jan 26 06:04:42 at org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase.testFullPath(MySqlCatalogITCase.java*:306) {code} {code} 2022-01-26T06:04:43.2899378Z Jan 26 06:04:43 [ERROR] Failures: 2022-01-26T06:04:43.2907942Z Jan 26 06:04:43 [ERROR] MySqlCatalogITCase.testFullPath:306 expected: java.util.ArrayList<[+I[1, -1, 1, null, true, 2022-01-26T06:04:43.2914065Z Jan 26 06:04:43 [ERROR] MySqlCatalogITCase.testGetTable:253 expected:<( 2022-01-26T06:04:43.2983567Z Jan 26 06:04:43 [ERROR] MySqlCatalogITCase.testSelectToInsert:323 expected: java.util.ArrayList<[+I[1, -1, 1, null, 2022-01-26T06:04:43.2997373Z Jan 26 06:04:43 [ERROR] MySqlCatalogITCase.testWithoutCatalog:291 expected: java.util.ArrayList<[+I[1, -1, 1, null, 2022-01-26T06:04:43.3010450Z Jan 26 06:04:43 [ERROR] MySqlCatalogITCase.testWithoutCatalogDB:278 expected: java.util.ArrayList<[+I[1, -1, 1, nul {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25824) E2e test phase fails on AZP after "Unable to locate package moreutils"
Roman Khachatryan created FLINK-25824: - Summary: E2e test phase fails on AZP after "Unable to locate package moreutils" Key: FLINK-25824 URL: https://issues.apache.org/jira/browse/FLINK-25824 Project: Flink Issue Type: Bug Components: Test Infrastructure Reporter: Roman Khachatryan Fix For: 1.15.0 [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30209=logs=bea52777-eaf8-5663-8482-18fbc3630e81=b2642e3a-5b86-574d-4c8a-f7e2842bfb14=17] [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30209=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=17] {code:java} E: Unable to locate package moreutils Running command 'flink-end-to-end-tests/run-nightly-tests.sh 1' with a timeout of 287 minutes. ./tools/azure-pipelines/uploading_watchdog.sh: line 76: ts: command not found /home/vsts/work/1/s/flink-end-to-end-tests/../tools/ci/maven-utils.sh: line 96: NPM_PROXY_PROFILE_ACTIVATION: command not found The STDIO streams did not close within 10 seconds of the exit event from process '/usr/bin/bash'. This may indicate a child process inherited the STDIO streams and has not yet exited. ##[error]Bash exited with code '141'. {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25750) Performance regression on 20.01.2021 in globalWindow and stateBackend benchmarks
Roman Khachatryan created FLINK-25750: - Summary: Performance regression on 20.01.2021 in globalWindow and stateBackend benchmarks Key: FLINK-25750 URL: https://issues.apache.org/jira/browse/FLINK-25750 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=globalWindow=2=200=off=on=on http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=stateBackends.FS=2=200=off=on=on http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=stateBackends.FS=2=200=off=on=on -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25740) PulsarSourceOrderedE2ECase fails on azure
Roman Khachatryan created FLINK-25740: - Summary: PulsarSourceOrderedE2ECase fails on azure Key: FLINK-25740 URL: https://issues.apache.org/jira/browse/FLINK-25740 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.15.0 Reporter: Roman Khachatryan https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29789=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=16385 {code} 2022-01-20T15:39:52.3915823Z Jan 20 15:39:52 [ERROR] Errors: 2022-01-20T15:39:52.3922501Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testIdleReader:187->SourceTestSuiteBase.gene rateAndWriteTestData:315 » BrokerPersistence 2022-01-20T15:39:52.3924207Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testIdleReader:187->SourceTestSuiteBase.gene rateAndWriteTestData:315 » BrokerPersistence 2022-01-20T15:39:52.3925830Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testMultipleSplits:145->SourceTestSuiteBase. generateAndWriteTestData:315 » BrokerPersistence 2022-01-20T15:39:52.3927464Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testMultipleSplits:145->SourceTestSuiteBase. generateAndWriteTestData:315 » BrokerPersistence 2022-01-20T15:39:52.3928743Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testSourceSingleSplit:105->SourceTestSuiteBa se.generateAndWriteTestData:315 » BrokerPersistence 2022-01-20T15:39:52.3930029Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testSourceSingleSplit:105->SourceTestSuiteBa se.generateAndWriteTestData:315 » BrokerPersistence 2022-01-20T15:39:52.3931359Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testTaskManagerFailure:232 » BrokerPersisten ce 2022-01-20T15:39:52.3932353Z Jan 20 15:39:52 [ERROR] PulsarSourceOrderedE2ECase>SourceTestSuiteBase.testTaskManagerFailure:232 » BrokerPersisten ce 2022-01-20T15:39:52.3933580Z Jan 20 15:39:52 [ERROR] PulsarSourceUnorderedE2ECase>UnorderedSourceTestSuiteBase.testOneSplitWithMultipleConsumers :60 » BrokerPersistence 2022-01-20T15:39:52.3934698Z Jan 20 15:39:52 [ERROR] PulsarSourceUnorderedE2ECase>UnorderedSourceTestSuiteBase.testOneSplitWithMultipleConsumers :60 » BrokerPersistence {code} {code} 2022-01-20T15:28:37.1467261Z Jan 20 15:28:37 [ERROR] org.apache.flink.tests.util.pulsar.PulsarSourceUnorderedE2ECase.testOneSplitWithMultipleConsumers(TestEnvironment, ExternalContext)[2] Time elapsed: 77.698 s <<< ERROR! 2022-01-20T15:28:37.1469146Z Jan 20 15:28:37 org.apache.pulsar.client.api.PulsarClientException$BrokerPersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException: Not enough non-faulty bookies available 2022-01-20T15:28:37.1470062Z Jan 20 15:28:37at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:985) 2022-01-20T15:28:37.1470802Z Jan 20 15:28:37at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95) 2022-01-20T15:28:37.1471598Z Jan 20 15:28:37at org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.sendMessages(PulsarRuntimeOperator.java:172) 2022-01-20T15:28:37.1472451Z Jan 20 15:28:37at org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.sendMessages(PulsarRuntimeOperator.java:167) 2022-01-20T15:28:37.1473307Z Jan 20 15:28:37at org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter.writeRecords(PulsarPartitionDataWriter.java:41) 2022-01-20T15:28:37.1474209Z Jan 20 15:28:37at org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase.testOneSplitWithMultipleConsumers(UnorderedSourceTestSuiteBase.java:60) 2022-01-20T15:28:37.1474949Z Jan 20 15:28:37at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-01-20T15:28:37.1475658Z Jan 20 15:28:37at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2022-01-20T15:28:37.1476383Z Jan 20 15:28:37at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-01-20T15:28:37.1477030Z Jan 20 15:28:37at java.lang.reflect.Method.invoke(Method.java:498) 2022-01-20T15:28:37.1477670Z Jan 20 15:28:37at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) 2022-01-20T15:28:37.1478388Z Jan 20 15:28:37at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25739) Include dstl-dfs into distribution (opt/)
Roman Khachatryan created FLINK-25739: - Summary: Include dstl-dfs into distribution (opt/) Key: FLINK-25739 URL: https://issues.apache.org/jira/browse/FLINK-25739 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25710) Multiple Kafka IT cases fail with "ContainerLaunch Container startup failed"
Roman Khachatryan created FLINK-25710: - Summary: Multiple Kafka IT cases fail with "ContainerLaunch Container startup failed" Key: FLINK-25710 URL: https://issues.apache.org/jira/browse/FLINK-25710 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Tests Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29731=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=35454 {code} 2022-01-19T18:17:40.3503774Z Jan 19 18:17:40 [INFO] --- 2022-01-19T18:17:42.3992027Z Jan 19 18:17:42 [ERROR] Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError 2022-01-19T18:17:42.9262342Z Jan 19 18:17:42 [INFO] Running org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase 2022-01-19T18:18:47.9992530Z Jan 19 18:18:47 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 65.053 s <<< FAILURE! - in or g.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase 2022-01-19T18:18:47.9993836Z Jan 19 18:18:47 [ERROR] org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase Time elapsed: 65.053 s <<< ERROR! 2022-01-19T18:18:47.9994507Z Jan 19 18:18:47 org.testcontainers.containers.ContainerLaunchException: Container startup failed ... 2022-01-19T18:18:48.0038449Z Jan 19 18:18:47 Caused by: org.rnorth.ducttape.RetryCountExceededException: Retry limit hit with exception 2022-01-19T18:18:48.0039451Z Jan 19 18:18:47at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:88) 2022-01-19T18:18:48.0040449Z Jan 19 18:18:47at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:329) 2022-01-19T18:18:48.0041204Z Jan 19 18:18:47... 27 more 2022-01-19T18:18:48.0041993Z Jan 19 18:18:47 Caused by: org.testcontainers.containers.ContainerLaunchException: Could not create/start container 2022-01-19T18:18:48.0043007Z Jan 19 18:18:47at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525) 2022-01-19T18:18:48.0044020Z Jan 19 18:18:47at org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331) 2022-01-19T18:18:48.0045158Z Jan 19 18:18:47at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) 2022-01-19T18:18:48.0046043Z Jan 19 18:18:47... 28 more 2022-01-19T18:18:48.0047026Z Jan 19 18:18:47 Caused by: org.testcontainers.containers.ContainerLaunchException: Timed out waiting for container po*rt to open (172.17.0.1 ports: [56218, 56219] should be listening) 2022-01-19T18:18:48.0048320Z Jan 19 18:18:47at org.testcontainers.containers.wait.strategy.HostPortWaitStrategy.waitUntilReady(HostPortWaitStr ategy.java:90) 2022-01-19T18:18:48.0049465Z Jan 19 18:18:47at org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStr ategy.java:51) 2022-01-19T18:18:48.0050585Z Jan 19 18:18:47at org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java: 929) 2022-01-19T18:18:48.0051628Z Jan 19 18:18:47at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:468) 2022-01-19T18:18:48.0052380Z Jan 19 18:18:47... 30 more ... 2022-01-19T18:40:37.7197924Z Jan 19 18:40:37 [INFO] Results: 2022-01-19T18:40:37.7198526Z Jan 19 18:40:37 [INFO] 2022-01-19T18:40:37.7199093Z Jan 19 18:40:37 [ERROR] Errors: 2022-01-19T18:40:37.7200602Z Jan 19 18:40:37 [ERROR] KafkaSinkITCase » ContainerLaunch Container startup failed 2022-01-19T18:40:37.7201683Z Jan 19 18:40:37 [ERROR] KafkaTransactionLogITCase » ContainerLaunch Container startup failed 2022-01-19T18:40:37.7204632Z Jan 19 18:40:37 [ERROR] KafkaWriterITCase.beforeAll:99 » ContainerLaunch Container startup failed {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25678) TaskExecutorStateChangelogStoragesManager.shutdown is not thread-safe
Roman Khachatryan created FLINK-25678: - Summary: TaskExecutorStateChangelogStoragesManager.shutdown is not thread-safe Key: FLINK-25678 URL: https://issues.apache.org/jira/browse/FLINK-25678 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.2, 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0, 1.14.4 [https://github.com/apache/flink/pull/18169#discussion_r785741977] The method is called from the shutdown hook and therefore should be thread-safe. cc: [~Zakelly] , [~dmvk] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25598) Changelog materialized state discarded on failure
Roman Khachatryan created FLINK-25598: - Summary: Changelog materialized state discarded on failure Key: FLINK-25598 URL: https://issues.apache.org/jira/browse/FLINK-25598 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 Similar to FLINK-25395: error handling in {{PeriodicMaterializationManager.uploadSnapshot}}discards uploaded state; however, wrapped backend might assume state was uploaded cc: [~ym], [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25395) Incremental shared state might be discarded by TM
Roman Khachatryan created FLINK-25395: - Summary: Incremental shared state might be discarded by TM Key: FLINK-25395 URL: https://issues.apache.org/jira/browse/FLINK-25395 Project: Flink Issue Type: Bug Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 Extracting from [FLINK-25185 discussion|https://issues.apache.org/jira/browse/FLINK-25185?focusedCommentId=17462639=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17462639] On checkpoint abortion or any failure in AsyncCheckpointRunnable, it discards the state, in particular shared (incremental) state. Since FLINK-24611, this creates a problem because shared state can be re-used for future checkpoints. Needs confirmation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25261) Changelog not truncated on materialization
Roman Khachatryan created FLINK-25261: - Summary: Changelog not truncated on materialization Key: FLINK-25261 URL: https://issues.apache.org/jira/browse/FLINK-25261 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 [https://github.com/apache/flink/blob/dcc4d43e413b20f70036e73c61d52e2e1c5afee7/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java#L640] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25260) Recovery fails when using changelog+s3+presto
Roman Khachatryan created FLINK-25260: - Summary: Recovery fails when using changelog+s3+presto Key: FLINK-25260 URL: https://issues.apache.org/jira/browse/FLINK-25260 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Runtime / State Backends Reporter: Roman Khachatryan Recovery succeeds if using local FS or hadoop S3 plugin, but fails with Presto: {code} Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) at org.apache.flink.changelog.fs.StateChangeFormat$1.readChange(StateChangeFormat.java:138) at org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:129) at org.apache.flink.changelog.fs.StateChangeFormat$1.next(StateChangeFormat.java:98) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:76) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.next(StateChangelogHandleStreamHandleReader.java:61) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:94) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:74) at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) at org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more {code} This is likely caused by some intermediate buffers -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25144) Manual test
Roman Khachatryan created FLINK-25144: - Summary: Manual test Key: FLINK-25144 URL: https://issues.apache.org/jira/browse/FLINK-25144 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.15.0 Test plan: [https://docs.google.com/document/d/10WVFA0BSR0zrRKRjQbB3iEiC7167MBeAwD6swn_eT24/edit?usp=sharing] The ticket should be split into multiple ones when needed (correctness, performance, usability - see the plan above). cc: [~yunta] , [~ym] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25143) Add ITCase for periodic materialization
Roman Khachatryan created FLINK-25143: - Summary: Add ITCase for periodic materialization Key: FLINK-25143 URL: https://issues.apache.org/jira/browse/FLINK-25143 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends, Tests Reporter: Roman Khachatryan Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25024) Add ChangelogBackend documentation
Roman Khachatryan created FLINK-25024: - Summary: Add ChangelogBackend documentation Key: FLINK-25024 URL: https://issues.apache.org/jira/browse/FLINK-25024 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Roman Khachatryan Fix For: 1.15.0 Currently, changelog backend is hidden from users documentation-wise. Once the feature is ready, the following needs to be documented: * General description (page [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/] ) * Configuration (page [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/] - StateChangelogOptions, FsStateChangelogOptions) * Uploader metrics (page [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/] , see FLINK-23486) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24938) Checkpoint cleaner is closed before checkpoints are discarded
Roman Khachatryan created FLINK-24938: - Summary: Checkpoint cleaner is closed before checkpoints are discarded Key: FLINK-24938 URL: https://issues.apache.org/jira/browse/FLINK-24938 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0, 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0, 1.14.1 When CompletedCheckpointStore shuts down it tries to discard some checkpoints using CheckpointCleaner. The latter is closed asynchronously since FLINK-23647 and before the Store. Visible as warning when running ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsZookeeper: {code} 2021-11-17 10:47:10,599 Fail to remove checkpoint during shutdown. [DefaultCompletedCheckpointStore flink-akka.actor.default-dispatcher-5] java.lang.IllegalStateException: CheckpointsCleaner has already been closed at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.incrementNumberOfCheckpointsToClean(CheckpointsCleaner.java:105) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.cleanup(CheckpointsCleaner.java:87) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.cleanCheckpoint(CheckpointsCleaner.java:62) ~[classes/:?] at org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.tryRemoveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:2 at org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.shutdown(DefaultCompletedCheckpointStore.java:172) ~[classes/:?] at org.apache.flink.runtime.scheduler.SchedulerBase.shutDownCheckpointServices(SchedulerBase.java:222) ~[classes/:?] {code} But the test still passes. cc: [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24903) AdaptiveSchedulerTest.testJobStatusListenerNotifiedOfJobStatusChanges unstable
Roman Khachatryan created FLINK-24903: - Summary: AdaptiveSchedulerTest.testJobStatusListenerNotifiedOfJobStatusChanges unstable Key: FLINK-24903 URL: https://issues.apache.org/jira/browse/FLINK-24903 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 [https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1225=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=511d2595-ec54-5ab7-86ce-92f328796f20=7753] Locally, it fails ~14 runs out of 100 (when running only testJobStatusListenerNotifiedOfJobStatusChanges in a loop). It looks like job termination future is always completed before the jobStatusChangeListener is notified (AdaptiveScheduler.transitionToState, targetState.getState() completes the future). Sleeping for 1ms before checking the assertion prevents the failure. cc: [~trohrmann] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24864) Release TaskManagerJobMetricGroup with the last slot rather than task
Roman Khachatryan created FLINK-24864: - Summary: Release TaskManagerJobMetricGroup with the last slot rather than task Key: FLINK-24864 URL: https://issues.apache.org/jira/browse/FLINK-24864 Project: Flink Issue Type: Sub-task Components: Runtime / Metrics, Runtime / State Backends Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.15.0 [https://docs.google.com/document/d/1k5WkWIYzs3n3GYQC76H9BLGxvN3wuq7qUHJuBPR9YX0/edit?usp=sharing] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24826) Performance regression in HeapState benchmarks on Nov 8 2021
Roman Khachatryan created FLINK-24826: - Summary: Performance regression in HeapState benchmarks on Nov 8 2021 Key: FLINK-24826 URL: https://issues.apache.org/jira/browse/FLINK-24826 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=listAdd.HEAP=2=200=off=on=on http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=listUpdate.HEAP=2=200=off=on=on http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=mapRemove.HEAP=2=200=off=on=on All affected benchmarks: listAdd.HEAP listAppend.HEAP listGet.HEAP listGetAndIterate.HEAP listUpdate.HEAP mapAdd.HEAP mapIsEmpty.HEAP mapKeys.HEAP mapRemove.HEAP mapUpdate.HEAP mapValues.HEAP valueAdd.HEAP good commit: b3b50559cf22f188ddb9cad62ecfb83881c47961 bad commit: fc4f255644a64bb556b0dcefb165a9c772164c5b It's very likely fc4f255644a64bb556b0dcefb165a9c772164c5b is the cause of the regression (in between there are only docs updates and this one is heap-related). cc: [~Zakelly] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24611) Prevent JM from discarding state on checkpoint abortion
Roman Khachatryan created FLINK-24611: - Summary: Prevent JM from discarding state on checkpoint abortion Key: FLINK-24611 URL: https://issues.apache.org/jira/browse/FLINK-24611 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 When a checkpoint is aborted, JM discards any state that was sent to it and wasn't used in other checkpoints. This forces incremental state backends to wait for confirmation from JM before re-using this state. For changelog backend this is even more critical. One approach proposed was to make backends/TMs responsible for their state, until it's not shared with other TMs, i.e. until rescaling (private/shared state ownership track). However, that approach is quite invasive. An alternative solution would be: 1. SharedStateRegistry remembers the latest checkpoint for each shared state (instead of usage count currently) 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint (on subsumption) 3. SharedStateRegistry then discards any state associated with the lower (subsumed/aborted) checkpoints So the aborted checkpoint can only be discarded after some subsequent successful checkpoint (which can mark state as used). Only JM code is changed. Implementation considerations. On subsumption, JM needs to find all the unused state and discard it. This can either be done by 1) simply traversing all entries; or by 2) maintaining a set of entries per checkpoint (e.g. SortedMap>). This allows to skip unnecessary traversal at the cost of higher memory usage In both cases: - each entry stores last checkpoint ID it was used in (long) - key is hashed (even with plain traversal, map.entrySet.iterator.remove() computes hash internally) Given the following constraints: - 10M state entries at most - 10 (retained) checkpoint at most - 10 checkpoints per second at most - state entry key takes 32B (usually UUID or two UUIDs) The extra space for (2) would be in order of 10M*32B=38Mb. The extra time for (1) would be in order of 10M * 10 checkpoints per second * ratio of outdated entries per checkpoint. Depending on the ratio and the hardware, this could take up to hundreds of ms per second, blocking the main thread. So approach (2) seems reasonable. The following cases shouldn't pose any difficulties: 1. Recovery, re-scaling, and state used by not all or by no tasks - we still register all states on recovery even after FLINK-22483/FLINK-24086 2. PlaceholderStreamStateHandles 3. Cross-task state sharing - not an issue as long as everything is managed by JM 4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - simple after FLINK-24086 The following should be kept in mind: 1. On job cancellation, state of aborted checkpoints should be cleaned up explicitly 2. Savepoints should be ignored and not change CheckpointStore.lowestCheckpointID -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24402) Add back-pressure metrics for the ChangelogStateBackend
Roman Khachatryan created FLINK-24402: - Summary: Add back-pressure metrics for the ChangelogStateBackend Key: FLINK-24402 URL: https://issues.apache.org/jira/browse/FLINK-24402 Project: Flink Issue Type: Sub-task Components: Benchmarks Reporter: Roman Khachatryan Fix For: 1.15.0 E.g. in-flight requests, request size, latency, number of “Logs” per request, errors. With back-pressure (FLINK-23381) it's very important because the task will be shown as busy in the UI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23971) PulsarSourceITCase.testIdleReader failed on azure
Roman Khachatryan created FLINK-23971: - Summary: PulsarSourceITCase.testIdleReader failed on azure Key: FLINK-23971 URL: https://issues.apache.org/jira/browse/FLINK-23971 Project: Flink Issue Type: Bug Reporter: Roman Khachatryan {code} [ERROR] Failures: [ERROR] PulsarSourceITCase>SourceTestSuiteBase.testIdleReader:193 Expected: Records consumed by Flink should be identical to test data and preserve the order in multip le splits but: Unexpected record 'tj7MpFRWX95GzBpSF3CCjxKSal6bRhR0aF' {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22819=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24448 This is the same error as in FLINK-23828 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23950) Revert FLINK-23738 (i.e. unhide config, API, docs)
Roman Khachatryan created FLINK-23950: - Summary: Revert FLINK-23738 (i.e. unhide config, API, docs) Key: FLINK-23950 URL: https://issues.apache.org/jira/browse/FLINK-23950 Project: Flink Issue Type: Sub-task Components: Documentation, Runtime / State Backends Reporter: Roman Khachatryan Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23876) Remove limitation about JDBC exactly once sink about multiple connections per transaction
Roman Khachatryan created FLINK-23876: - Summary: Remove limitation about JDBC exactly once sink about multiple connections per transaction Key: FLINK-23876 URL: https://issues.apache.org/jira/browse/FLINK-23876 Project: Flink Issue Type: Improvement Components: Connectors / JDBC, Documentation Affects Versions: 1.14.0 Reporter: Roman Khachatryan Fix For: 1.14.0 [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink] says:{quote}Attention: In 1.13, Flink JDBC sink does not support exactly-once mode with MySQL or other databases that do not support multiple XA transaction per connection. We will improve the support in FLINK-22239.{quote} In FLINK- connection pooling was added so this limitation doesn't apply anymore. So this should be removed or replaced with a list of databases. Connection pooling should be enabled explicitly: should list DBs requiring this (mysql, postgres) and give an example code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23862) Race condition while cancelling task during initialization
Roman Khachatryan created FLINK-23862: - Summary: Race condition while cancelling task during initialization Key: FLINK-23862 URL: https://issues.apache.org/jira/browse/FLINK-23862 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.14.0 Reporter: Roman Khachatryan Fix For: 1.14.0 While debugging the recent failures in FLINK-22889, I see that sometimes the operator chain is not closed if the task is cancelled while it's being initialized. The reason is that on restore(), cleanUpInvoke() is only closed if there was an exception, including CancelTaskException. The latter is only thrown if StreamTask.canceled is set, i.e. TaskCanceler has called StreamTask.cancel(). So if StreamTask is cancelled in between restore and normal invoke then it may not close the operator chain and not do other cleanup. One solution is to make StreamTask.cleanup visible to and called from Task. cc: [~akalashnikov], [~pnowojski] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23811) Handle FINISHED subtasks in CommonTestUtils.waitForAllTaskRunning
Roman Khachatryan created FLINK-23811: - Summary: Handle FINISHED subtasks in CommonTestUtils.waitForAllTaskRunning Key: FLINK-23811 URL: https://issues.apache.org/jira/browse/FLINK-23811 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.14.0 Reporter: Roman Khachatryan CommonTestUtils.waitForAllTaskRunning returns when all the subtasks are running AND the job is running and not finished. However, with FLIP-147, subtasks may finish and the job will still be running. So the method won't return and instead timeout. The solution could be: - For new tests that can have finished subtasks treat return if subtask is RUNING || FINISHED - For old tests (that assume no finished subtasks) throw an exception Note that a subtask may be in some other state (e.g. CANCELLED) which is fine, as it can change after failing over the job. This change is extracted from FLINK-21090 into a separate ticket because multiple IT cases might be affected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23770) Unable to recover after source fully finished
Roman Khachatryan created FLINK-23770: - Summary: Unable to recover after source fully finished Key: FLINK-23770 URL: https://issues.apache.org/jira/browse/FLINK-23770 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Roman Khachatryan Fix For: 1.14.0 When running one of the IT cases from https://github.com/apache/flink/pull/16773 I see the following failure: {code} 10194 [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Can not restore vertex Source: Custom Source -> Timestamps/Watermarks(cbc357ccb763df2852fee8c4fc7d55f2) which contain both finished and unfinished operators at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.calculateIfFinished(CheckpointCoordinator.java:1651) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.lambda$getOrUpdate$0(CheckpointCoordinator.java:1631) ~[classes/:?] at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[?:1.8.0_271] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.getOrUpdate(CheckpointCoordinator.java:1629) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.validateFinishedOperators(CheckpointCoordinator.java:1674) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1577) ~[classes/:?] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks(CheckpointCoordinator.java:1438) ~[classes/:?] at org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:398) ~[classes/:?] at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasks(DefaultScheduler.java:317) ~[classes/:?] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$null$2(DefaultScheduler.java:287) ~[classes/:?] at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) ~[?:1.8.0_271] at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) ~[?:1.8.0_271] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_271] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT] at akka.actor.Actor.aroundReceive(Actor.scala:537)