[jira] [Created] (FLINK-35584) Support Java 21 in flink-docker

2024-06-12 Thread Josh England (Jira)
Josh England created FLINK-35584:


 Summary: Support Java 21 in flink-docker
 Key: FLINK-35584
 URL: https://issues.apache.org/jira/browse/FLINK-35584
 Project: Flink
  Issue Type: Improvement
  Components: flink-docker
Reporter: Josh England


Support Java 21. Base images are available for 8, 11 and 17 but since Apache 
flink now supports Java 21 (albeit in Beta) it would be good to have a base 
image for that too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35583) Flinker-connector-jdbc is expected to support DDL synchronization for mysql

2024-06-12 Thread linux (Jira)
linux created FLINK-35583:
-

 Summary: Flinker-connector-jdbc is expected to support DDL 
synchronization for mysql
 Key: FLINK-35583
 URL: https://issues.apache.org/jira/browse/FLINK-35583
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: linux


I strongly hope the flinker-connector-jdbc is expected to support DDL 
synchronization for mysql,This use case is very common and is a feature that 
many users now expect to have,Hope the official can enhance this 
function,thanks a lot!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35582) Marking ingestDB as the default recovery mode for rescaling

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35582:
--

 Summary: Marking ingestDB as the default recovery mode for 
rescaling
 Key: FLINK-35582
 URL: https://issues.apache.org/jira/browse/FLINK-35582
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35580) Fix ingestDB recovery mode related bugs

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35580:
--

 Summary: Fix ingestDB recovery mode related bugs
 Key: FLINK-35580
 URL: https://issues.apache.org/jira/browse/FLINK-35580
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35581) Remove comments from the code related to ingestDB

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35581:
--

 Summary: Remove comments from the code related to ingestDB
 Key: FLINK-35581
 URL: https://issues.apache.org/jira/browse/FLINK-35581
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35579) Update the FrocksDB version in FLINK

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35579:
--

 Summary: Update the FrocksDB version in FLINK
 Key: FLINK-35579
 URL: https://issues.apache.org/jira/browse/FLINK-35579
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35578) Release Frocksdb-8.10.0 official products

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35578:
--

 Summary: Release Frocksdb-8.10.0 official products
 Key: FLINK-35578
 URL: https://issues.apache.org/jira/browse/FLINK-35578
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35577) Setup the CI environment for FRocksDB-8.10

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35577:
--

 Summary: Setup the CI environment for FRocksDB-8.10
 Key: FLINK-35577
 URL: https://issues.apache.org/jira/browse/FLINK-35577
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35576) FRocksdb Cherry pick IngestDB requires commit

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35576:
--

 Summary: FRocksdb Cherry pick IngestDB requires  commit
 Key: FLINK-35576
 URL: https://issues.apache.org/jira/browse/FLINK-35576
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0


We support the API related to ingest DB in FRocksDb-8.10.0, but many of the 
fixes related to ingest DB were only integrated in the latest RocksDB version. 
So we need to add these fixed commit cherryclicks to FRocksDB.
Mainly include:
[https://github.com/facebook/rocksdb/pull/11646]
[https://github.com/facebook/rocksdb/pull/11868]
[https://github.com/facebook/rocksdb/pull/11811]
[https://github.com/facebook/rocksdb/pull/11381]
[https://github.com/facebook/rocksdb/pull/11379]
[https://github.com/facebook/rocksdb/pull/11378]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35575) FRocksDB supports disabling perf context during compilation

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35575:
--

 Summary: FRocksDB supports disabling perf context during 
compilation
 Key: FLINK-35575
 URL: https://issues.apache.org/jira/browse/FLINK-35575
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0


In FrocksDB 6 thread-local perf-context is disabled by reverting a specific 
commit (FLINK-19710). However, this creates conflicts and makes upgrading more 
difficult. We found that disabling *PERF_CONTEXT* can improve the performance 
of statebenchmark by about 5% and it doesn't create any conflicts. So we plan 
to supports disabling perf context during compilation in FRocksDB new version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35574) Set up base branch for FrocksDB-8.10

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35574:
--

 Summary: Set up base branch for FrocksDB-8.10
 Key: FLINK-35574
 URL: https://issues.apache.org/jira/browse/FLINK-35574
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0


As the first part of FLINK-35573, we need to prepare a base branch for 
FRocksDB-8.10.0 first. Mainly, it needs to be checked out from version 8.10.0 
of the Rocksdb community. Then check pick the commit which used by Flink from 
FRocksDB-6.20.3 to 8.10.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35573) [FLIP-447] Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-06-12 Thread Yue Ma (Jira)
Yue Ma created FLINK-35573:
--

 Summary: [FLIP-447] Upgrade FRocksDB from 6.20.3 to 8.10.0
 Key: FLINK-35573
 URL: https://issues.apache.org/jira/browse/FLINK-35573
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 2.0.0
Reporter: Yue Ma
 Fix For: 2.0.0


The FLIP: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0|https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0]
 

*_This FLIP proposes upgrading the version of FRocksDB in the Flink Project 
from 6.20.3 to 8.10.0._*

_RocksDBStateBackend is widely used by Flink users in large state scenarios.The 
last upgrade of FRocksDB was in version Flink-1.14, which mainly supported 
features such as support arm platform, deleteRange API, period compaction, etc. 
It has been a long time since then, and RocksDB has now been released to 
version 8.x. The main motivation for this upgrade is to leverage the features 
of higher versions of Rocksdb to make Flink RocksDBStateBackend more powerful. 
While RocksDB is also continuously optimizing and bug fixing, we hope to keep 
FRocksDB more or less in sync with RocksDB and upgrade it periodically._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35572) flink db2 cdc default value error

2024-06-12 Thread junxin lai (Jira)
junxin  lai created FLINK-35572:
---

 Summary: flink db2 cdc default value error 
 Key: FLINK-35572
 URL: https://issues.apache.org/jira/browse/FLINK-35572
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: junxin  lai


I am using flink db2-cdc to sync database in real time,but fails to handle 
default values in schema when is making the snapshot.

After digging deeper into the problem, I found that this seems to be a bug in 
debezium and was fixed in 
2.0.0.CR1([https://issues.redhat.com/browse/DBZ-4990]). The latest flink3.1 
uses debezium version 1.9.8.Final. 

The default value is a common configuration in DB2. Is there a way we can 
backport this patch to 1.9.8.Final? 
!https://private-user-images.githubusercontent.com/18555755/338830194-2959745b-0952-4a27-a741-c03d13c47061.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTgxNzUyODYsIm5iZiI6MTcxODE3NDk4NiwicGF0aCI6Ii8xODU1NTc1NS8zMzg4MzAxOTQtMjk1OTc0NWItMDk1Mi00YTI3LWE3NDEtYzAzZDEzYzQ3MDYxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjEyVDA2NDk0NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThmMTk2ZWY2ZjRiM2U1MTE5ZDI5NGRiOThmZDBkMTk2ZGQ4YzUwNGZjMzQxNDEwNGExMWNiZmJmMzM2ZmIyYzMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.vehLYcbVqKM-exZU4E_DifFRfmWACAKFD_9Wo1z_0So!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation

2024-06-12 Thread Grace Grimwood (Jira)
Grace Grimwood created FLINK-35571:
--

 Summary: ProfilingServiceTest.testRollingDeletion intermittently 
fails due to improper test isolation
 Key: FLINK-35571
 URL: https://issues.apache.org/jira/browse/FLINK-35571
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: *Git revision:*
{code:bash}
$ git show
commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master)
{code}

*Java info:*
{code:bash}
$ java -version
openjdk version "17.0.11" 2024-04-16
OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9)
OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode)
{code}

{code:bash}
$ sdk current
Using:
java: 17.0.11-tem
maven: 3.8.6
scala: 2.12.19
{code}

*OS info:*
{code:bash}
$ uname -av
Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May  1 20:14:38 PDT 
2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64
{code}

*Hardware info:*
{code:bash}
$ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e 
'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:'
hw.memsize: 34359738368
machdep.cpu.core_count: 12
machdep.cpu.brand_string: Apple M2 Pro
{code}
Reporter: Grace Grimwood
 Attachments: 
20240612_181148_mvn-clean-package_flink-runtime_also-make.log

*Symptom:*
The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the 
following error:
{code:java}
[ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 s 
<<< FAILURE! -- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest
[ERROR] 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion 
-- Time elapsed: 9.264 s <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <3> but was: <6>
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
at 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175)
at 
org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
{code}
The number of extra files found varies from failure to failure.

*Cause:*
Many of the tests in *{{ProfilingServiceTest}}* rely on a specific 
configuration of the *{{ProfilingService}}* instance, but 
*{{ProfilingService.getInstance}}* does not check whether an existing 
instance's config matches the provided config before returning it. Because of 
this, and because JUnit does not guarantee a specific ordering of tests (unless 
they are specifically annotated), it is possible for these tests to receive an 
instance that does not behave in the expected way and therefore fail.

*Analysis:*
In troubleshooting the test failure, we tried adding an extra assertion to 
*{{ProfilingServiceTest.setUp}}* to validate the directories being written to 
were correct:
{code:java}
Assertions.assertEquals(tempDir.toString(), 
profilingService.getProfilingResultDir());
{code}
That assert produced the following failure:
{code:java}
org.opentest4j.AssertionFailedError: expected: 
 but 
was: 
{code}
This failure shows that the *{{ProfilingService}}* returned by 
*{{ProfilingService.getInstance}}* in the setup is not using the correct 
directory, and therefore cannot be the correct instance for this test class 
because it has the wrong config.

This is because the static method *{{ProfilingService.getInstance}}* attempts 
to reuse any existing instance of *{{ProfilingService}}* before it creates a 
new one and disregards any differences in config in doing so, which means that 
if another test instantiates a *{{ProfilingService}}* with different config 
first and does not close it, that previous instance will be provided to 
*{{ProfilingServiceTest}}* rather than the new instance those tests seem to 
expect

[jira] [Created] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging

2024-06-11 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35570:
---

 Summary: Consider PlaceholderStreamStateHandle in checkpoint file 
merging
 Key: FLINK-35570
 URL: https://issues.apache.org/jira/browse/FLINK-35570
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
Assignee: Zakelly Lan


In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} 
into account during lifecycle, since it can be a file merged one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)
Jane Chan created FLINK-35569:
-

 Summary: 
SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
 failed
 Key: FLINK-35569
 URL: https://issues.apache.org/jira/browse/FLINK-35569
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines, Build System / CI
Affects Versions: 1.20.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35568) Add imagePullSecrets for FlinkDeployment spec

2024-06-11 Thread Gang Huang (Jira)
Gang Huang created FLINK-35568:
--

 Summary: Add imagePullSecrets for FlinkDeployment spec
 Key: FLINK-35568
 URL: https://issues.apache.org/jira/browse/FLINK-35568
 Project: Flink
  Issue Type: Improvement
Reporter: Gang Huang


I am confused that how to configure imagePullSecrets for a private dockerhub 
website, since there maybe are no related parameters found in the official docs 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35567:
-

 Summary: CDC BinaryWriter cast NullableSerializerWrapper error 
 Key: FLINK-35567
 URL: https://issues.apache.org/jira/browse/FLINK-35567
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.1.1


Current, we will generate data type serializers by 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
 which will put into a 
NullableSerializerWrapper.
{code:java}
//代码占位符
public BinaryRecordDataGenerator(DataType[] dataTypes) {
this(
dataTypes,
Arrays.stream(dataTypes)
.map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new));
} {code}
However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
NullableSerializerWrapper to 
ArrayDataSerializer/TypeSerializer/TypeSerializer.
A exception will be thrown:
{code:java}
java.lang.ClassCastException: 
org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
at 
org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
at 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35566) Consider promoting TypeSerializer from PublicEvolving to Public

2024-06-11 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-35566:
--

 Summary: Consider promoting TypeSerializer from PublicEvolving to 
Public
 Key: FLINK-35566
 URL: https://issues.apache.org/jira/browse/FLINK-35566
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Core
Reporter: Martijn Visser


While working on implementing FLINK-35378, I ran into the problem that 
TypeSerializer is still on PublicEvolving since Flink 1.0. We should consider 
annotating this as Public. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

2024-06-11 Thread Naci Simsek (Jira)
Naci Simsek created FLINK-35565:
---

 Summary: Flink KafkaSource Batch Job Gets Into Infinite Loop after 
Resetting Offset
 Key: FLINK-35565
 URL: https://issues.apache.org/jira/browse/FLINK-35565
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-3.1.0
 Environment: This is reproduced on a *Flink 1.18.1* with the latest 
Kafka connector 3.1.0-1.18 on a session cluster.
Reporter: Naci Simsek
 Attachments: image-2024-06-11-11-19-09-889.png, 
taskmanager_localhost_54489-ac092a_log.txt

h2. Summary

Flink batch job gets into an infinite fetch loop and could not gracefully 
finish if the connected Kafka topic is empty and starting offset value in Flink 
job is lower than the current start/end offset of the related topic. See below 
for details:
h2. How to reproduce

Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
from Kafka topic.

Related Kafka topic is empty, there are no events, and the offset value is as 
below: *15*

!image-2024-06-11-11-19-09-889.png|width=895,height=256!

 

Flink job uses a *specific starting offset* value, which is +*less*+ than the 
current offset of the topic/partition.

See below, it set as “4”

{{}}
{code:java}
package naci.grpId;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public class KafkaSource_Print {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// Define the specific offsets for the partitions
Map specificOffsets = new HashMap<>();
specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start 
from offset 4 for partition 0

KafkaSource kafkaSource = KafkaSource
.builder()
.setBootstrapServers("localhost:9093")  // Make sure the port 
is correct
.setTopics("topic_test")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
.setBounded(OffsetsInitializer.latest())
.build();

DataStream stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
stream.print();

env.execute("Flink KafkaSource test job");
}
}{code}
{{}}

 

Here are the initial logs printed related to the offset, as soon as the job 
gets submitted:

{{}}
{code:java}
2024-05-30 12:15:50,010 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer   
   [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask{code}
{{}}

 

Since the starting offset {color:#FF}*4*{color} is *out of range* for the 
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
manager logs:

{{}}
{code:java}
2024-05-30 12:15:50,193 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher  [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
FetchPosit

[jira] [Created] (FLINK-35564) The topic cannot be distributed on subtask when calculatePartitionOwner returns -1

2024-06-11 Thread Jira
中国无锡周良 created FLINK-35564:
--

 Summary: The topic cannot be distributed on subtask when 
calculatePartitionOwner returns -1
 Key: FLINK-35564
 URL: https://issues.apache.org/jira/browse/FLINK-35564
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.2
Reporter: 中国无锡周良


The topic cannot be distributed on subtask when calculatePartitionOwner returns 
-1
{code:java}
@VisibleForTesting
static int calculatePartitionOwner(String topic, int partitionId, int 
parallelism) {
int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % parallelism;
/*
 * Here, the assumption is that the id of Pulsar partitions are always 
ascending starting from
 * 0. Therefore, can be used directly as the offset clockwise from the 
start index.
 */
return (startIndex + partitionId) % parallelism;
} {code}
Here startIndex is a non-negative number calculated based on topic.hashCode() 
and in the range [0, parallelism-1].

For non-partitioned topic. partitionId is NON_PARTITION_ID = -1;

but
{code:java}
@Override
public Optional> createAssignment(
List readers) {
if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
return Optional.empty();
}

Map> assignMap =
new HashMap<>(pendingPartitionSplits.size());

for (Integer reader : readers) {
Set splits = 
pendingPartitionSplits.remove(reader);
if (splits != null && !splits.isEmpty()) {
assignMap.put(reader, new ArrayList<>(splits));
}
}

if (assignMap.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new SplitsAssignment<>(assignMap));
}
} {code}
pendingPartitionSplits can't possibly have a value of -1, right? The 
calculation method of the topic by the above return 1, pendingPartitionSplits. 
Remove (reader), forever is null; This topic will not be assigned to a subtask; 
And I simulated this topic locally and found that messages were indeed not 
processed;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35563) 'Run kubernetes application test' failed on AZP

2024-06-10 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35563:
--

 Summary: 'Run kubernetes application test' failed on AZP
 Key: FLINK-35563
 URL: https://issues.apache.org/jira/browse/FLINK-35563
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35562) WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds

2024-06-10 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35562:
--

 Summary: WindowTableFunctionProcTimeRestoreTest produced no output 
for 900 seconds
 Key: FLINK-35562
 URL: https://issues.apache.org/jira/browse/FLINK-35562
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35561) Flink REST API incorrect documentation

2024-06-10 Thread Shyam (Jira)
Shyam created FLINK-35561:
-

 Summary: Flink REST API incorrect documentation
 Key: FLINK-35561
 URL: https://issues.apache.org/jira/browse/FLINK-35561
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Runtime / REST
Reporter: Shyam


Flink REST API documentation for JAR upload 
([/jars/upload|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-upload])
 states that the response will contain fileName, which will then be used later 
to run Flink Jobs.

In the 
[/jar/:jarid/run]([https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run)]
 documentation, the definition for :jarid is "String value that identifies a 
jar. When uploading the jar a path is returned, where the filename is the ID. 
This value is equivalent to the `id` field in the list of uploaded jars 
(/jars)."

 

This statement identifying file name should be changed to:

String value that identifies a jar. When uploading the jar, a path is returned, 
where the {*}filename contains the ID{*}, and it is the text after the last 
forward slash. This value is equivalent to the `id` field in the list of 
uploaded jars (/jars).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-35560:
---

 Summary: Add query validator support to flink sql gateway via spi 
pattern
 Key: FLINK-35560
 URL: https://issues.apache.org/jira/browse/FLINK-35560
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway
Reporter: dongwoo.kim


h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:sql}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35559) Shading issue cause class conflict

2024-06-08 Thread Aleksandr Pilipenko (Jira)
Aleksandr Pilipenko created FLINK-35559:
---

 Summary: Shading issue cause class conflict
 Key: FLINK-35559
 URL: https://issues.apache.org/jira/browse/FLINK-35559
 Project: Flink
  Issue Type: Bug
  Components: Connectors / AWS
Affects Versions: aws-connector-4.3.0, aws-connector-4.2.0
Reporter: Aleksandr Pilipenko
 Fix For: aws-connector-4.4.0


Incorrect shading configuration causes ClassCastException during exception 
handling when job package flink-connector-kinesis with another AWS sink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35558) [docs] "Edit This Page" tool does not follow contribution guidelines

2024-06-07 Thread Matt Braymer-Hayes (Jira)
Matt Braymer-Hayes created FLINK-35558:
--

 Summary: [docs] "Edit This Page" tool does not follow contribution 
guidelines
 Key: FLINK-35558
 URL: https://issues.apache.org/jira/browse/FLINK-35558
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Matt Braymer-Hayes


h1. Problem

The [documentation 
site|https://nightlies.apache.org/flink/flink-docs-release-1.19/] offers an 
"Edit This Page" button at the bottom of most pages 
([example|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/try-flink/local_installation/#summary]),
 allowing a user to quickly open a GitHub PR to resolve the issue.

 

Unfortunately this feature uses the version branch (e.g., {{{}release-1.19{}}}) 
as the base, whereas the [documentation contribution 
guide|https://flink.apache.org/how-to-contribute/contribute-documentation/#submit-your-contribution]
 expects {{master}} to be the base. Since these release branches are often 
incompatible with {{master}} (i.e., I can't do a simple rebase or merge), I end 
up not being able to use the "Edit This Page" feature and instead have to make 
the change myself on GitHub or locally.
h1. Solution

Edit the anchor 
([source|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/docs/layouts/partials/docs/footer.html#L30])
 to use {{master}} instead of {{{}.Site.Params.Branch{}}}. This would lower the 
barrier to entry significantly for docs changes and allow the "Edit This Page" 
feature to work as intended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35557) MemoryManager only reserves memory per consumer type once

2024-06-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35557:
-

 Summary: MemoryManager only reserves memory per consumer type once
 Key: FLINK-35557
 URL: https://issues.apache.org/jira/browse/FLINK-35557
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Runtime / Task
Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3, 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0, 1.19.1


# In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we 
[create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526]
 a reserve function
 # The function 
[decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61]
 the available Slot memory and fails if there's not enough memory
 # We pass it to {{SharedResources.getOrAllocateSharedResource}}
 # In {{SharedResources.getOrAllocateSharedResource}} , we check if the 
resource (memory) was already reserved by some key (e.g. 
{{{}state-rocks-managed-memory{}}})
 # If not, we create a new one and call the reserve function
 # If the resource was already reserved (not null), we do NOT reserve the 
memory again: 
[https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71]


So there will be only one (first) memory reservation for rocksdb for example, 
no matter how many state backends are created. Meaning that managed memory 
limits are not followed (edited) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED

2024-06-07 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35556:
-

 Summary: Wrong constant in 
RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
 Key: FLINK-35556
 URL: https://issues.apache.org/jira/browse/FLINK-35556
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.19.0, 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0, 1.19.1


See 
https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35555) Serializing List with null values throws NPE

2024-06-07 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-3:
-

 Summary: Serializing List with null values throws NPE
 Key: FLINK-3
 URL: https://issues.apache.org/jira/browse/FLINK-3
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.20.0
Reporter: Zhanghao Chen
 Fix For: 1.20.0


FLINK-34123 introduced built-in serialization support for java.util.List, which 
relies on the existing {{ListSerializer}} impl. However, {{ListSerializer}} 
does not allow null values, as it is originally designed for serializing 
{{ListState}} only where null value is explicitly forbidden in the contract.

Directly adding null marker to allow null values will break backwards state 
compatibility, so we'll need to introduce a new List serializer and 
corrsponding TypeInformation that allows null values for serializing user 
objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35554) usrlib is not added to classpath when using containers

2024-06-07 Thread Josh England (Jira)
Josh England created FLINK-35554:


 Summary: usrlib is not added to classpath when using containers
 Key: FLINK-35554
 URL: https://issues.apache.org/jira/browse/FLINK-35554
 Project: Flink
  Issue Type: Bug
  Components: flink-docker
Affects Versions: 1.19.0
 Environment: Docker
Reporter: Josh England


We use flink-docker to create a "standalone" application, with a Dockerfile 
like...
 
{code:java}
FROM flink:1.18.1-java17
COPY application.jar /opt/flink/usrlib/artifacts/
{code}

However, after upgrading to 1.19.0 we found our application would not start. We 
saw errors like the following in the logs:


{noformat}
org.apache.flink.util.FlinkException: Could not load the provided entrypoint 
class.
   at 
org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:230)
   at 
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:149)
   at 
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.lambda$main$0(StandaloneApplicationClusterEntryPoint.java:90)
   at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
   at 
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:89)
   Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
program's entry point class 'X' was not found in the jar file.
{noformat}

We were able to fix the issue by placing the application.jar in /opt/flink/lib 
instead. My guess is that the usrlib directory isn't being added to the 
classpath by the shell scripts that launch flink from a container.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35553) Integrate newly added trigger interface with checkpointing

2024-06-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-35553:
-

 Summary: Integrate newly added trigger interface with checkpointing
 Key: FLINK-35553
 URL: https://issues.apache.org/jira/browse/FLINK-35553
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Coordination
Reporter: Matthias Pohl


This connects the newly introduced trigger logic (FLINK-35551) with the newly 
added checkpoint lifecycle listening feature (FLINK-35552).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35552) Move CheckpointStatsTracker out of ExecutionGraph into Scheduler

2024-06-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-35552:
-

 Summary: Move CheckpointStatsTracker out of ExecutionGraph into 
Scheduler
 Key: FLINK-35552
 URL: https://issues.apache.org/jira/browse/FLINK-35552
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Coordination
Reporter: Matthias Pohl


The scheduler needs to know about the CheckpointStatsTracker to allow listening 
to checkpoint failures and completion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35551) Introduces RescaleManager#onTrigger endpoint

2024-06-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-35551:
-

 Summary: Introduces RescaleManager#onTrigger endpoint
 Key: FLINK-35551
 URL: https://issues.apache.org/jira/browse/FLINK-35551
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl


The new endpoint would allow use from separating observing change events from 
actually triggering the rescale operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35550) Introduce new component RescaleManager

2024-06-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-35550:
-

 Summary: Introduce new component RescaleManager
 Key: FLINK-35550
 URL: https://issues.apache.org/jira/browse/FLINK-35550
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Matthias Pohl


The goal here is to collect the rescaling logic in a single component to 
improve testability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35549) FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing for the AdaptiveScheduler

2024-06-07 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-35549:
-

 Summary: FLIP-461: Synchronize rescaling with checkpoint creation 
to minimize reprocessing for the AdaptiveScheduler
 Key: FLINK-35549
 URL: https://issues.apache.org/jira/browse/FLINK-35549
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.20.0
Reporter: Matthias Pohl


This is the umbrella issue for implementing 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35548) Add E2E tests for PubSubSinkV2

2024-06-07 Thread Ahmed Hamdy (Jira)
Ahmed Hamdy created FLINK-35548:
---

 Summary: Add E2E tests for PubSubSinkV2
 Key: FLINK-35548
 URL: https://issues.apache.org/jira/browse/FLINK-35548
 Project: Flink
  Issue Type: Sub-task
Reporter: Ahmed Hamdy
Assignee: Ahmed Hamdy
 Fix For: gcp-pubsub-3.2.0


Refactor Google PubSub source to use Unified Sink API 
[FLIP-143|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35547) Sources support HybridSource implements

2024-06-07 Thread linqigeng (Jira)
linqigeng created FLINK-35547:
-

 Summary: Sources support HybridSource implements
 Key: FLINK-35547
 URL: https://issues.apache.org/jira/browse/FLINK-35547
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: linqigeng


Since the specifications of slave instances are generally smaller than the 
primary instance, it is more likely to restart when encountering performance 
bottlenecks(e.g. CPU,memory,disk), and there is a certain delay with the 
primary instance. In order not to affect the running Flink CDC jobs and improve 
effectiveness, here is an idea:

Sources adapt HybridSouce, so we can do like:
 * in the snapshot stage , readers fetch data from the slave instances
 * after the snapshot phase ends, switches to reading binlog from the primary 
instance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35546) Elasticsearch 8 connector fails fast for non-retryable bulk request items

2024-06-07 Thread Mingliang Liu (Jira)
Mingliang Liu created FLINK-35546:
-

 Summary: Elasticsearch 8 connector fails fast for non-retryable 
bulk request items
 Key: FLINK-35546
 URL: https://issues.apache.org/jira/browse/FLINK-35546
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: Mingliang Liu


Discussion thread: 
[https://lists.apache.org/thread/yrf0mmbch0lhk3rgkz94fr0x5qz2417l]

{quote}
Currently the Elasticsearch 8 connector retries all items if the request fails 
as a whole, and retries failed items if the request has partial failures 
[[1|https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170]\].
 I think this infinitely retries might be problematic in some cases when 
retrying can never eventually succeed. For example, if the request is 400 (bad 
request) or 404 (not found), retries do not help. If there are too many failed 
items non-retriable, new requests will get processed less effectively. In 
extreme cases, it may stall the pipeline if in-flight requests are occupied by 
those failed items.

FLIP-451 proposes timeout for retrying which helps with un-acknowledged 
requests, but not addressing the case when request gets processed and failed 
items keep failing no matter how many times we retry. Correct me if I'm wrong.

One opinionated option is to fail fast for non-retriable errors like 400 / 404 
and to drop items for 409. Or we can allow users to configure "drop/fail" 
behavior for non-retriable errors. I prefer the latter. I checked how LogStash 
ingests data to Elasticsearch and it takes a similar approach for non-retriable 
errors 
[[2|https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304]\].
 In my day job, we have a dead-letter-queue in AsynSinkWriter for failed 
entries that exhaust retries. I guess that is too specific to our setup and 
seems an overkill here for Elasticsearch connector.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35545) Miss 3.0.1 version in snapshot flink-cdc doc version list

2024-06-06 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35545:
---

 Summary: Miss 3.0.1 version in snapshot flink-cdc doc version list
 Key: FLINK-35545
 URL: https://issues.apache.org/jira/browse/FLINK-35545
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Zhongqiang Gong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35544) No description of Flink Operator in the Deployment of  Kubernetes document

2024-06-06 Thread linqigeng (Jira)
linqigeng created FLINK-35544:
-

 Summary: No description of Flink Operator in the Deployment of  
Kubernetes document
 Key: FLINK-35544
 URL: https://issues.apache.org/jira/browse/FLINK-35544
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: linqigeng


There is no description of Flink Operator in the Deployment of  Kubernetes 
document, so users don't know how to submit a pipeline job via Flink Operator.

The link is :
{code:java}
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/deployment/kubernetes/
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35543) Upgrade Hive 2.3 connector to version 2.3.10

2024-06-06 Thread Cheng Pan (Jira)
Cheng Pan created FLINK-35543:
-

 Summary: Upgrade Hive 2.3 connector to version 2.3.10
 Key: FLINK-35543
 URL: https://issues.apache.org/jira/browse/FLINK-35543
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Cheng Pan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset

2024-06-06 Thread Jan Gurda (Jira)
Jan Gurda created FLINK-35542:
-

 Summary: ClassNotFoundException when deserializing 
CheckpointedOffset
 Key: FLINK-35542
 URL: https://issues.apache.org/jira/browse/FLINK-35542
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: jdbc-3.1.2
 Environment: Flink 1.19.0

Flink JDBC Connector 3.2-SNAPSHOT (commit 
2defbbcf4fc550a76dd9c664e1eed7d261e028ca)

JDK 11 (Temurin)
Reporter: Jan Gurda
 Fix For: jdbc-3.2.0


I use the latest flink-connector-jdbc code from the main branch, it's actually 
3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).

 

When jobs get interrupted while reading data from the JDBC source (for example, 
by the TaskManager outage), they cannot recover due to the following exception:
{code:java}
java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown 
Source)
    at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Unknown Source)
    at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
    at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69

[jira] [Created] (FLINK-35541) Introduce retry limiting for AWS connector sinks

2024-06-06 Thread Aleksandr Pilipenko (Jira)
Aleksandr Pilipenko created FLINK-35541:
---

 Summary: Introduce retry limiting for AWS connector sinks
 Key: FLINK-35541
 URL: https://issues.apache.org/jira/browse/FLINK-35541
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / AWS, Connectors / DynamoDB, Connectors / 
Firehose, Connectors / Kinesis
Affects Versions: aws-connector-4.2.0
Reporter: Aleksandr Pilipenko


Currently if the record write operation in the sink consistently fails with 
retriable error, sinks will retry indefinitely. In case when cause of the error 
is not resolved this may lead to stuck operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-06 Thread Qigeng Lin (Jira)
Qigeng Lin created FLINK-35540:
--

 Summary: flink-cdc-pipeline-connector-mysql lost table which 
database and table with the same name
 Key: FLINK-35540
 URL: https://issues.apache.org/jira/browse/FLINK-35540
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Qigeng Lin


h1. Description
When the parameter of 'tables' in mysql pipeline job contains a table which 
database and table are with the same name like 'app.app', the job will fail and 
the error meaasge is like:
{code:java}
java.lang.IllegalArgumentException: Cannot find any table by the option 
'tables' = app.app {code}
h1. How to reproduce

Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
like this YAML defined:
{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app.app
  server-id: 5400-5404
  server-time-zone: UTCsink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35539) The artifactId of flink-migration-test-utils module has a typo

2024-06-06 Thread Zhen Wang (Jira)
Zhen Wang created FLINK-35539:
-

 Summary: The artifactId of  flink-migration-test-utils module has 
a typo
 Key: FLINK-35539
 URL: https://issues.apache.org/jira/browse/FLINK-35539
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.20.0
Reporter: Zhen Wang


The artifactId of  flink-migration-test-utils module has a typo.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35538) pipeline sink support caseSensitive

2024-06-06 Thread melin (Jira)
melin created FLINK-35538:
-

 Summary: pipeline sink support caseSensitive
 Key: FLINK-35538
 URL: https://issues.apache.org/jira/browse/FLINK-35538
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0, cdc-3.1.1
Reporter: melin
 Attachments: image-2024-06-06-15-51-02-428.png

source is case sensitive, sink is case insensitive. Even paimon doesn't allow 
capital letters

!image-2024-06-06-15-51-02-428.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35537) Error parsing list of enum in legacy yaml configuration

2024-06-06 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35537:
---

 Summary: Error parsing list of enum in legacy yaml configuration 
 Key: FLINK-35537
 URL: https://issues.apache.org/jira/browse/FLINK-35537
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Zakelly Lan


In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
{code:java}
Caused by: java.lang.IllegalArgumentException: Could not parse value 
'[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
at 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
at 
org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
... 19 more
Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
DISABLE_COMPRESSION_OPTION]]
at 
org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
at 
org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
at 
org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
at 
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
at 
org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
at java.util.Optional.map(Optional.java:215)
at 
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
... 28 more
{code}
I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35536) FileSystem sink on S3 produces invalid Avros when compaction is disabled

2024-06-05 Thread Juliusz Nadberezny (Jira)
Juliusz Nadberezny created FLINK-35536:
--

 Summary: FileSystem sink on S3 produces invalid Avros when 
compaction is disabled
 Key: FLINK-35536
 URL: https://issues.apache.org/jira/browse/FLINK-35536
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.19.0
Reporter: Juliusz Nadberezny


Compaction on FileSystem sink on S3 uses multipart upload process. 
When compaction is disabled after being enabled, the files that where being 
kept by multipart upload and then are "released" with CompleteMultipartUpload 
will be broken.

Broken Avro files seem to have Avro schema duplicated at the beginning of the 
file.

 

Steps to reproduce:
1. Deploy job with FileSystem sink with compaction enabled writing to S3/MinIO.

2. Wait for job to produce some output.

3. Redeploy job with compaction disabled.

4. Wait for multipart upload complete and verify released files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35535) Enable benchmark profiling in daily run

2024-06-05 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35535:
---

 Summary: Enable benchmark profiling in daily run
 Key: FLINK-35535
 URL: https://issues.apache.org/jira/browse/FLINK-35535
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.20.0


After FLINK-35534, the flink-benchmark supports profiling. We could consider 
enabling this in daily run.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35534) Support profiler for benchmarks

2024-06-05 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-35534:
---

 Summary: Support profiler for benchmarks
 Key: FLINK-35534
 URL: https://issues.apache.org/jira/browse/FLINK-35534
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Zakelly Lan
 Fix For: 1.20.0


As JMH support profiling during benchmark, the flink-benchmark could leverage 
this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35533) FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn

2024-06-05 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-35533:
-

 Summary: FLIP-459: Support Flink hybrid shuffle integration with 
Apache Celeborn
 Key: FLINK-35533
 URL: https://issues.apache.org/jira/browse/FLINK-35533
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.20.0
Reporter: Yuxin Tan
Assignee: Yuxin Tan


This is the umbrella jira for 
[FLIP-459|https://cwiki.apache.org/confluence/display/FLINK/FLIP-459%3A+Support+Flink+hybrid+shuffle+integration+with+Apache+Celeborn].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard

2024-06-05 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-35532:
---

 Summary: Prevent Cross-Site Authentication (XSA) attacks on Flink 
dashboard
 Key: FLINK-35532
 URL: https://issues.apache.org/jira/browse/FLINK-35532
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0, 1.19.1
Reporter: Hong Liang Teoh
Assignee: Hong Liang Teoh
 Fix For: 1.19.2


As part of FLINK-33325, we introduced a new tab on the Flink dashboard to 
trigger the async profiler on the JobManager and TaskManager.

 

The HTML component introduced links out to async profiler page on Github -> 
[https://github.com/async-profiler/async-profiler/wiki].

However, the anchor element introduced does not follow best practices around 
preventing XSA attacks, by setting up the below:
{code:java}
target="_blank" rel="noopener noreferrer"{code}
We should add these attributes as best practice!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35531) Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream

2024-06-05 Thread dzcxzl (Jira)
dzcxzl created FLINK-35531:
--

 Summary: Avoid calling hsync in flush method in 
BaseHadoopFsRecoverableFsDataOutputStream
 Key: FLINK-35531
 URL: https://issues.apache.org/jira/browse/FLINK-35531
 Project: Flink
  Issue Type: Bug
Reporter: dzcxzl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35530) protobuf-format support discard unknow field Improve deserialization performance

2024-06-05 Thread JingWei Li (Jira)
JingWei Li created FLINK-35530:
--

 Summary: protobuf-format support discard unknow field Improve 
deserialization performance
 Key: FLINK-35530
 URL: https://issues.apache.org/jira/browse/FLINK-35530
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: JingWei Li


Add a protobuf option that allows calling 
_CodedStreamHelper.discardUnknownFields_ to save the performance overhead of 
deserializing unknown fields when decoding data.
{code:java}
create table source (...) with (
 'format' = 'protobuf',
 'protobuf.discard-unknown-field' = 'true'
){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35529) protobuf-format compatible protobuf bad indentifier

2024-06-05 Thread JingWei Li (Jira)
JingWei Li created FLINK-35529:
--

 Summary: protobuf-format compatible protobuf bad indentifier
 Key: FLINK-35529
 URL: https://issues.apache.org/jira/browse/FLINK-35529
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.2
Reporter: JingWei Li
 Fix For: 2.0.0


The main bug occurs during the decode process. The decode method is a method 
generated by the codegen of Flink at runtime, and in the process of generating 
the decode method, some getter and setter methods of the protobuf object need 
to be used to construct the RowData. Currently, the way to generate the getter 
and setter is through string concatenation, using the "get" prefix and 
camelCase variable names. Some special characters may lead to errors in the 
generated Getter and Setter methods, thus causing bugs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35528) Skip execution of interruptible mails when yielding

2024-06-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35528:
--

 Summary: Skip execution of interruptible mails when yielding
 Key: FLINK-35528
 URL: https://issues.apache.org/jira/browse/FLINK-35528
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.20.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


When operators are yielding, for example waiting for async state access to 
complete before a checkpoint, it would be beneficial to not execute 
interruptible mails. Otherwise continuation mail for firing timers would be 
continuously re-enqeueed. To achieve that MailboxExecutor must be aware which 
mails are interruptible.

The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35527) Polish quickstart guide & clean stale links in docs

2024-06-05 Thread yux (Jira)
yux created FLINK-35527:
---

 Summary: Polish quickstart guide & clean stale links in docs
 Key: FLINK-35527
 URL: https://issues.apache.org/jira/browse/FLINK-35527
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: yux
 Fix For: cdc-3.2.0


Currently, there's still a lot of stale links in Flink CDC docs, including some 
download links pointing to Ververica maven repositories. Need to clean them up 
to avoid user conflicts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35526) Remove deprecated stedolan/jq Docker image from Flink e2e tests

2024-06-05 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-35526:
--

 Summary: Remove deprecated stedolan/jq Docker image from Flink e2e 
tests
 Key: FLINK-35526
 URL: https://issues.apache.org/jira/browse/FLINK-35526
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Reporter: Robert Metzger
Assignee: Robert Metzger


Our CI logs contain this warning: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60060=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=3828

{code}
latest: Pulling from stedolan/jq
[DEPRECATION NOTICE] Docker Image Format v1, and Docker Image manifest version 
2, schema 1 support will be removed in an upcoming release. Suggest the author 
of docker.io/stedolan/jq:latest to upgrade the image to the OCI Format, or 
Docker Image manifest v2, schema 2. More information at 
https://docs.docker.com/go/deprecated-image-specs/
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM

2024-06-05 Thread Zhen Wang (Jira)
Zhen Wang created FLINK-35525:
-

 Summary: HDFS  delegation token fetched by custom 
DelegationTokenProvider is not passed to Yarn AM
 Key: FLINK-35525
 URL: https://issues.apache.org/jira/browse/FLINK-35525
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.18.1, 1.19.0
Reporter: Zhen Wang


I tried running flink with hadoop proxy user by disabling HadoopModuleFactory 
and flink built-in token providers, and implementing a custom token provider.

However, only the hdfs token obtained by hadoopfs provider was added in 
YarnClusterDescriptor, which resulted in Yarn AM submission failure.

Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35524) Clear connections pools when reader exist.

2024-06-05 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35524:
-

 Summary: Clear connections pools when reader exist.
 Key: FLINK-35524
 URL: https://issues.apache.org/jira/browse/FLINK-35524
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, inJdbcConnectionPools is static instance, so the datasource pools in 
it won't be recycle when reader close. It will cause memory leak.

```java
public class JdbcConnectionPools implements ConnectionPools {

private static final Logger LOG = 
LoggerFactory.getLogger(JdbcConnectionPools.class);

private static JdbcConnectionPools instance;
private final Map pools = new HashMap<>();
private static final Map POOL_FACTORY_MAP = 
new HashMap<>();
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35523) When using the Hive connector to read a Hive table in Parquet format, a null pointer exception is thrown.

2024-06-05 Thread Jichao Wang (Jira)
Jichao Wang created FLINK-35523:
---

 Summary: When using the Hive connector to read a Hive table in 
Parquet format, a null pointer exception is thrown.
 Key: FLINK-35523
 URL: https://issues.apache.org/jira/browse/FLINK-35523
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.19.0, 1.16.2
Reporter: Jichao Wang


When using the Hive connector to read a Hive table in Parquet format, a null 
pointer exception is thrown.
The exception stack information is as follows:
{code:text}
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 [flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_342]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_342]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, 
Size: 1
at 
org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:40)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
... 6 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_342]
at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_342]
at 
org.apache.flink.hive.shaded.parquet.schema.GroupType.getType(GroupType.java:216)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:277)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:266)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:256)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:139)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:75)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:110)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:65)
 ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1]
at 
org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.lambda$fetch$0(HdpFileSourceSplitReader.java:38)
 ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP

[jira] [Created] (FLINK-35522) The source task may get stuck after a failover occurs in batch jobs

2024-06-04 Thread xingbe (Jira)
xingbe created FLINK-35522:
--

 Summary: The source task may get stuck after a failover occurs in 
batch jobs
 Key: FLINK-35522
 URL: https://issues.apache.org/jira/browse/FLINK-35522
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.20.0
Reporter: xingbe
 Fix For: 1.20.0


If the source task does not get assigned a split because the SplitEnumerator 
has no more splits, and a failover occurs during the closing process, the 
SourceCoordinatorContext will not resend the NoMoreSplit event to the newly 
started source task, causing the source vertex to remain stuck indefinitely. 
This case may only occur in batch jobs where speculative execution has been 
enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35521) Flink FileSystem SQL Connector Generating SUCESS File Multiple Times

2024-06-04 Thread EMERSON WANG (Jira)
EMERSON WANG created FLINK-35521:


 Summary: Flink FileSystem SQL Connector Generating SUCESS File 
Multiple Times
 Key: FLINK-35521
 URL: https://issues.apache.org/jira/browse/FLINK-35521
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.18.1
 Environment: Our PyFlink SQL jobs are running in AWS EKS environment.
Reporter: EMERSON WANG


Our Flink table SQL job received data from the Kafka streams and then sinked 
all partitioned data into the associated parquet files under the same S3 folder
through the filesystem SQL connector.

For the S3 filesystem SQL connector, sink.partition-commit.policy.kind was set 
to 'success-file' and sink.partition-commit.trigger was set
to 'partition-time'. We found that _SUCCESS file in the S3 folder was generated 
multiple times after multiple partitions are committed.

Because all partitioned parquet files and _SUCCESS file are in the same S3 
folder and _SUCCESS file is used to trigger the downstream application, we 
really like the _SUCCESS file to be generated only once instead of multiple 
times after all partitions are committed and all parquet files are ready to be 
processed.
Thus, one _SUCCESS file can be used to trigger the downstream application only 
once instead of multiple times.

We knew we could set sink.partition-commit.trigger to 'process-time' to 
generate _SUCCESS file only once in the S3 folder; however, 'process-time' 
would not meet our business requirements.

We'd request the FileSystem SQL connector should support to the following new 
user case:
Even if sink.partition-commit.trigger is set to 'partition-time', _SUCCESS file 
will be generated only once after all partitions are committed and all output 
files are ready to be processed, and will be used to trigger the downstream 
application only once instead of multiple times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35520) Nightly build can't compile as problems were detected from NoticeFileChecker

2024-06-04 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35520:
--

 Summary: Nightly build can't compile as problems were detected 
from NoticeFileChecker
 Key: FLINK-35520
 URL: https://issues.apache.org/jira/browse/FLINK-35520
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35519) Flink Job fails with SingleValueAggFunction received more than one element

2024-06-04 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-35519:


 Summary: Flink Job fails with SingleValueAggFunction received more 
than one element
 Key: FLINK-35519
 URL: https://issues.apache.org/jira/browse/FLINK-35519
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Dawid Wysakowicz


When running a query:
{code}
select 
   (SELECT 
   t.id 
FROM raw_pagerduty_users, UNNEST(teams) AS t(id, type, summary, self, 
html_url))
from raw_pagerduty_users;
{code}
it is translated to:

{code}
Sink(table=[default_catalog.default_database.sink], fields=[EXPR$0])
+- Calc(select=[$f0 AS EXPR$0])
   +- Join(joinType=[LeftOuterJoin], where=[true], select=[c, $f0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
  :- Exchange(distribution=[single])
  :  +- Calc(select=[c])
  : +- TableSourceScan(table=[[default_catalog, default_database, 
raw_pagerduty_users, project=[c, teams], metadata=[]]], fields=[c, 
teams])(reuse_id=[1])
  +- Exchange(distribution=[single])
 +- GroupAggregate(select=[SINGLE_VALUE(id) AS $f0])
+- Exchange(distribution=[single])
   +- Calc(select=[id])
  +- Correlate(invocation=[$UNNEST_ROWS$1($cor0.teams)], 
correlate=[table($UNNEST_ROWS$1($cor0.teams))], 
select=[c,teams,id,type,summary,self,html_url], rowType=[RecordType(BIGINT c, 
RecordType:peek_no_expand(VARCHAR(2147483647) id, VARCHAR(2147483647) type, 
VARCHAR(2147483647) summary, VARCHAR(2147483647) self, VARCHAR(2147483647) 
html_url) ARRAY teams, VARCHAR(2147483647) id, VARCHAR(2147483647) type, 
VARCHAR(2147483647) summary, VARCHAR(2147483647) self, VARCHAR(2147483647) 
html_url)], joinType=[INNER])
 +- Reused(reference_id=[1])
{code}

and it fails with:

{code}
java.lang.RuntimeException: SingleValueAggFunction received more than one 
element.
at GroupAggsHandler$150.accumulate(Unknown Source)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:571)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:900)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:849)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35518) CI Bot doesn't run on PRs

2024-06-04 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35518:
--

 Summary: CI Bot doesn't run on PRs
 Key: FLINK-35518
 URL: https://issues.apache.org/jira/browse/FLINK-35518
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Piotr Nowojski


Doesn't want to run on my PR/branch. I was doing force-pushes, rebases, asking 
flink bot to run, closed and opened new PR, but nothing helped
https://github.com/apache/flink/pull/24868
https://github.com/apache/flink/pull/24883

I've heard others were having similar problems recently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35517) CI pipeline triggered by pull request seems unstable

2024-06-04 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35517:
--

 Summary: CI pipeline triggered by pull request seems unstable
 Key: FLINK-35517
 URL: https://issues.apache.org/jira/browse/FLINK-35517
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo


Flink CI pipeline triggered by pull request seems sort of unstable. 

For example, https://github.com/apache/flink/pull/24883 was filed 15 hours ago, 
but CI report is UNKNOWN.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35516) Update the Experimental annotation to PublicEvolving for files connector

2024-06-03 Thread RocMarshal (Jira)
RocMarshal created FLINK-35516:
--

 Summary: Update the Experimental annotation to PublicEvolving for 
files connector 
 Key: FLINK-35516
 URL: https://issues.apache.org/jira/browse/FLINK-35516
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / FileSystem
Reporter: RocMarshal


as described in https://issues.apache.org/jira/browse/FLINK-35496
We should update the annotations for the stable APIs in files connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35515) Upgrade hive version to 4.0.0

2024-06-03 Thread vikasap (Jira)
vikasap created FLINK-35515:
---

 Summary: Upgrade hive version to 4.0.0
 Key: FLINK-35515
 URL: https://issues.apache.org/jira/browse/FLINK-35515
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.18.1
Reporter: vikasap
 Fix For: 1.18.2


Hive version 4.0.0 was released recently. However none of the major flink 
versions will work with this. Filing this so that major flink version's 
flink-sql and table api will be able to work with the new version of hive 
metastore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35514) Add Flink CDC Channel to Apache Flink Slack Workspace

2024-06-03 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35514:
---

 Summary: Add Flink CDC Channel to Apache Flink Slack Workspace
 Key: FLINK-35514
 URL: https://issues.apache.org/jira/browse/FLINK-35514
 Project: Flink
  Issue Type: Bug
Reporter: Zhongqiang Gong


DISCUSS thread : 
https://lists.apache.org/thread/gqzrs3c0j9k7c5m9m5k2slomgorrqrwf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35513) ArtifactFetchManager unit tests are failing

2024-06-03 Thread Elphas Toringepi (Jira)
Elphas Toringepi created FLINK-35513:


 Summary: ArtifactFetchManager unit tests are failing
 Key: FLINK-35513
 URL: https://issues.apache.org/jira/browse/FLINK-35513
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.19.0
Reporter: Elphas Toringepi


The following unit tests in ArtifactFetchManagerTest are failing

 * testFileSystemFetchWithAdditionalUri()

 * testHttpFetch()
 * testMixedArtifactFetch()

 
{code:java}
Test 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testFileSystemFetchWithAdditionalUri[testFileSystemFetchWithAdditionalUri()]
 failed with:
java.lang.AssertionError: 
Expecting actual not to be empty
 at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:251)
 at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testFileSystemFetchWithAdditionalUri(ArtifactFetchManagerTest.java:104)
 ...


Test 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testMixedArtifactFetch[testMixedArtifactFetch()]
 failed with:
java.lang.AssertionError: 
Expecting actual not to be empty
 at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:251)
 at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testMixedArtifactFetch(ArtifactFetchManagerTest.java:149)
 ...


Test 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testHttpFetch[testHttpFetch()]
 failed with:
java.lang.AssertionError: 
Expecting actual not to be empty
 at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:251)
 at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testHttpFetch(ArtifactFetchManagerTest.java:124)
 ... 
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35512) ArtifactFetchManagerTest unit tests fail

2024-06-03 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-35512:
---

 Summary: ArtifactFetchManagerTest unit tests fail
 Key: FLINK-35512
 URL: https://issues.apache.org/jira/browse/FLINK-35512
 Project: Flink
  Issue Type: Technical Debt
Affects Versions: 1.19.1
Reporter: Hong Liang Teoh
 Fix For: 1.19.1


The below three tests from *ArtifactFetchManagerTest* seem to fail consistently:
 * ArtifactFetchManagerTest.testFileSystemFetchWithAdditionalUri
 * ArtifactFetchManagerTest.testMixedArtifactFetch
 * ArtifactFetchManagerTest.testHttpFetch

The error printed is
{code:java}
java.lang.AssertionError:
Expecting actual not to be empty
    at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:248)
    at 
org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testMixedArtifactFetch(ArtifactFetchManagerTest.java:146)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35511) Enhance metrics. The incremental and full phases count the number of records separately

2024-06-03 Thread melin (Jira)
melin created FLINK-35511:
-

 Summary: Enhance metrics. The incremental and full phases count 
the number of records separately
 Key: FLINK-35511
 URL: https://issues.apache.org/jira/browse/FLINK-35511
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: melin
 Attachments: image-2024-06-03-22-06-38-591.png

dataworks 实时同步的metric 信息

!image-2024-06-03-22-06-38-591.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35510) Implement basic incremental checkpoint for ForStStateBackend

2024-06-03 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-35510:
---

 Summary: Implement basic incremental checkpoint for 
ForStStateBackend
 Key: FLINK-35510
 URL: https://issues.apache.org/jira/browse/FLINK-35510
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / State Backends
Reporter: Feifan Wang


Use low DB api implement a basic incremental checkpoint for ForStStatebackend, 
follow steps:
 # db.disableFileDeletions()
 # db.getLiveFiles(true)
 # db.entableFileDeletes(false)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35509) Slack community invite link has expired

2024-06-03 Thread Ufuk Celebi (Jira)
Ufuk Celebi created FLINK-35509:
---

 Summary: Slack community invite link has expired
 Key: FLINK-35509
 URL: https://issues.apache.org/jira/browse/FLINK-35509
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Ufuk Celebi


The Slack invite link on the website has expired.

I've generated a new invite link without expiration here: 
[https://join.slack.com/t/apache-flink/shared_invite/zt-2k0fdioxx-D0kTYYLh3pPjMu5IItqx3Q]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35508) Use OceanBase LTS version Docker image in testing

2024-06-03 Thread He Wang (Jira)
He Wang created FLINK-35508:
---

 Summary: Use OceanBase LTS version Docker image in testing
 Key: FLINK-35508
 URL: https://issues.apache.org/jira/browse/FLINK-35508
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: He Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35507) Support For Individual Job Level Resource Allocation in Session Cluster in k8s

2024-06-02 Thread Amarjeet (Jira)
Amarjeet created FLINK-35507:


 Summary: Support For Individual Job Level Resource Allocation in 
Session Cluster in k8s
 Key: FLINK-35507
 URL: https://issues.apache.org/jira/browse/FLINK-35507
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / Kubernetes
Reporter: Amarjeet


We can have a setup like Spark where in Spark Cluster we can set individual job 
level setting in a spark cluster to access the resouces from memory to core. 
Also Support Dynamic memory allocation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-02 Thread elon_X (Jira)
elon_X created FLINK-35506:
--

 Summary: disable kafka auto-commit and rely on flink’s 
checkpointing if both are enabled
 Key: FLINK-35506
 URL: https://issues.apache.org/jira/browse/FLINK-35506
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: elon_X


When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35505) RegionFailoverITCase.testMultiRegionFailover has never ever restored state

2024-06-02 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35505:
--

 Summary: RegionFailoverITCase.testMultiRegionFailover has never 
ever restored state
 Key: FLINK-35505
 URL: https://issues.apache.org/jira/browse/FLINK-35505
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35504) Improve Elasticsearch 8 connector observability

2024-06-01 Thread Mingliang Liu (Jira)
Mingliang Liu created FLINK-35504:
-

 Summary: Improve Elasticsearch 8 connector observability
 Key: FLINK-35504
 URL: https://issues.apache.org/jira/browse/FLINK-35504
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: elasticsearch-3.1.0
Reporter: Mingliang Liu


Currently all logs are in DEBUG level. Some of those messages are very helpful 
to get the progress and errors, which can be changed to INFO or WARN level. We 
can also include error details into DEBUG level messages so it's easier to 
debug with more context.

Meanwhile, we can update the metric to track {{numRecordsSend}}. FWIW, the base 
class tracks following metrics already so we don't need to implement them: 
{{CurrentSendTime}} Gauge, {{NumBytesOut}} and {{NumRecordsOut}} Counters.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35503) OracleE2eITCase fails with error ORA-12528 on Mac M2

2024-06-01 Thread Saketh Kurnool (Jira)
Saketh Kurnool created FLINK-35503:
--

 Summary: OracleE2eITCase fails with error ORA-12528 on Mac M2
 Key: FLINK-35503
 URL: https://issues.apache.org/jira/browse/FLINK-35503
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
 Environment:  
 * Mac M2 (Apple Silicon)
 * using docker desktop with Rosetta enabled for amd64 emulation

 
Reporter: Saketh Kurnool
 Attachments: com.ververica.cdc.connectors.tests.OracleE2eITCase.txt, 
oracle-docker-setup-logs.txt

Hello Flink CDC community,

I am attempting to run `OracleE2eITCase` (in the cdc source connector e2e 
tests), and I am running into the following runtime exception: 
{code:java}
java.sql.SQLException: 
Listener refused the connection with the following error:
ORA-12528, TNS:listener: all appropriate instances are blocking new connections
 
    at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:854)
    at 
oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793)
    at 
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57)
    at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747)
    at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562)
    at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)
    at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:228)
    at 
com.ververica.cdc.connectors.tests.OracleE2eITCase.getOracleJdbcConnection(OracleE2eITCase.java:197)
    at 
com.ververica.cdc.connectors.tests.OracleE2eITCase.testOracleCDC(OracleE2eITCase.java:149)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
    at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
Caused by: oracle.net.ns.NetException: Listener refused the connection with the 
following error:
ORA-12528, TNS:listener: all appropriate instances are blocking new connections
 
    at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:284)
    at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340)
    at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596)
    at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588)
    ... 11 more{code}
I have attached the test results to this issue.

`OracleE2eITCase` runs the `goodboy008/oracle-19.3.0-ee:non-cdb` docker image. 
I am able to reproduce the same issue when I run this docker image locally - my 
observation is that dockerized Oracle DB instance is not being set up properly, 
as I notice another ORA in the setup logs (`ORA-03113: end-of-file on 
communication channel`). I have also attached the logs from the docker image 
setup to this issue. To reproduce the ORA-12528 issue locally, I:
 * ran: `docker run goodboy008/oracle-19.3.0-ee:non-cdb`
 * ssh'ed into the db pod
 * ran: `sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba`

Any insight/workaround on getting this e2e test and the docker image running on 
my machine would be much appreciated. I'm also happy to provide any other 
information regarding my setup in the comments. Thank you!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35502) compress the checkpoint metadata ZK/ETCD HA Services

2024-06-01 Thread Ying Z (Jira)
Ying Z created FLINK-35502:
--

 Summary: compress the checkpoint metadata ZK/ETCD HA Services
 Key: FLINK-35502
 URL: https://issues.apache.org/jira/browse/FLINK-35502
 Project: Flink
  Issue Type: Improvement
Reporter: Ying Z


In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:

```
checkpointID-0036044: 
checkpointID-0036045: 
...
...
```

However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:

```
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}).
```

In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.

```java
                // Map -> Json
                ObjectMapper objectMapper = new ObjectMapper();
                String checkpointJson = 
objectMapper.writeValueAsString(checkpointMap);
                // copress and base64
                String compressedBase64 = compressAndEncode(checkpointJson);
                compressedData.put("checkpoint-all", compressedBase64);

    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream)) {
            gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
        }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    }
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35501) Use common thread pools when transferring RocksDB state files

2024-05-31 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35501:
-

 Summary: Use common thread pools when transferring RocksDB state 
files
 Key: FLINK-35501
 URL: https://issues.apache.org/jira/browse/FLINK-35501
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0


Currently, each RocksDB state backend creates an executor backed by a thread 
pool.

This makes it difficult to control the total number of threads per TM because 
it might have at least one task per slot and theoretically, many state backends 
per task (because of chaining).

Additionally, using a common thread pool allows to indirectly control the load 
on the underlying DFS (e.g. the total number of requests to S3 from a TM).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35500) DynamoDb SinkWriter fails to delete elements due to key not found

2024-05-31 Thread Ahmed Hamdy (Jira)
Ahmed Hamdy created FLINK-35500:
---

 Summary: DynamoDb SinkWriter fails to delete elements due to key 
not found
 Key: FLINK-35500
 URL: https://issues.apache.org/jira/browse/FLINK-35500
 Project: Flink
  Issue Type: Bug
  Components: Connectors / DynamoDB
Affects Versions: aws-connector-4.2.0, aws-connector-4.1.0, 
aws-connector-4.0.0
Reporter: Ahmed Hamdy
 Fix For: aws-connector-4.4.0


h2. Description
When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} 
records and throws 
{quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
 The provided key element does not match the schema{quote}

This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key 
instead of the constructed primary Key[1].

Note: The issue is reported in user mailing list[2]

h2. Steps to Reproduce

(1) Create a new DynamoDB table in AWS.  Command line:
aws dynamodb create-table \
  --table-name orders \
  --attribute-definitions AttributeName=userId,AttributeType=S \
  --key-schema AttributeName=userId,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST

(2) Create an input file in Debezium-JSON format with the following rows to 
start:
{"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
{"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
{"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
{"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}

(3) Start the Flink SQL Client, and run the following, substituting in the 
proper local paths for the Dynamo Connector JAR file and for this local sample 
input file:

ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'changelog';

CREATE TABLE Orders_CDC(
  orderId BIGINT,
  price float,
  userId STRING
 ) WITH (
   'connector' = 'filesystem',
   'path' = '/path/to/input_file.jsonl',
   'format' = 'debezium-json'
 );

CREATE TABLE Orders_Dynamo (
  orderId BIGINT,
  price float,
  userId STRING,
  PRIMARY KEY (userId) NOT ENFORCED
) PARTITIONED BY ( userId )
WITH (
  'connector' = 'dynamodb',
  'table-name' = 'orders',
  'aws.region' = 'us-east-1'
);

INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;

(4) At this point, we will see that things currently all work properly, and 
these 4 rows are inserted properly to Dynamo, because they are "Insert" 
operations.   So far, so good!

(5) Now, add the following row to the input file.  This represents a deletion 
in Debezium format, which should then cause a Deletion on the corresponding 
DynamoDB table:
{"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}

(6) Re-Run the SQL statement:
INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;

h3. References
1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35499) EventTimeWindowCheckpointingITCase times out due to Checkpoint expired before completing

2024-05-31 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35499:
---

 Summary: EventTimeWindowCheckpointingITCase times out due to 
Checkpoint expired before completing
 Key: FLINK-35499
 URL: https://issues.apache.org/jira/browse/FLINK-35499
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: Ryan Skraba


* 1.20 AdaptiveScheduler / Test (module: tests) 
https://github.com/apache/flink/actions/runs/9311892945/job/25632037990#step:10:8702
* 1.20 Default (Java 8) / Test (module: tests) 
https://github.com/apache/flink/actions/runs/9275522134/job/25520829730#step:10:8264

Going into the logs, we see the following error occurs:
{code:java}

Test testTumblingTimeWindow[statebackend type =ROCKSDB_INCREMENTAL, 
buffersPerChannel = 
2](org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase) is 
running.

<...>
20:24:23,562 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 22 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1716927863562 for job 
15d0a663cb415b09b9a68ccc40640c6d.
20:24:23,609 [jobmanager-io-thread-2] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 22 for job 15d0a663cb415b09b9a68ccc40640c6d (2349132 bytes, 
checkpointDuration=43 ms, finalizationTime=4 ms).
20:24:23,610 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 23 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1716927863610 for job 
15d0a663cb415b09b9a68ccc40640c6d.
20:24:23,620 [jobmanager-io-thread-2] WARN  
org.apache.flink.runtime.jobmaster.JobMaster [] - Error while 
processing AcknowledgeCheckpoint message
java.lang.IllegalStateException: Attempt to reference unknown state: 
a9a90973-4ee5-384f-acef-58a7c7560920
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.state.SharedStateRegistryImpl.registerReference(SharedStateRegistryImpl.java:97)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.state.SharedStateRegistry.registerReference(SharedStateRegistry.java:53)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.registerSharedStates(IncrementalRemoteKeyedStateHandle.java:289)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState.registerSharedState(OperatorSubtaskState.java:243)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState.registerSharedStates(OperatorSubtaskState.java:226)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.TaskStateSnapshot.registerSharedStates(TaskStateSnapshot.java:193)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1245)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)
 ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64) 
~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_392]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_392]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]
20:24:23,663 [Source: Custom Source (1/1)#1] INFO  
org.apache.flink.runtime.taskmanager.Task[] - Source: 
Custom Source (1/1)#1 
(bc4de0d149fba0ca825771ff7eeae08d_bc764cd8ddf7a0cff126f51c16239658_0_1) 
switched from RUNNING to FINISHED.
20:24:23,663 [Source: Custom Source (1/1)#1] INFO  
org.apache.flink.runtime.taskmanager.Task[] - Freeing task 
resources for Source: Custom Source (1/1)#1 
(bc4de0d149fba0ca825771ff7eeae08d_bc764cd8ddf7a0cff126f51c16239658_0_1).
20:24:23,663 [flink-pekko.actor.default-dispatcher-8] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
fo

[jira] [Created] (FLINK-35498) Unexpected argument name conflict error when do extract method params from udf

2024-05-31 Thread lincoln lee (Jira)
lincoln lee created FLINK-35498:
---

 Summary: Unexpected argument name conflict error when do extract 
method params from udf
 Key: FLINK-35498
 URL: https://issues.apache.org/jira/browse/FLINK-35498
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0, 1.20.0
Reporter: lincoln lee
Assignee: xuyang


Follow the steps to reproduce the error:

test case:

{code}
util.addTemporarySystemFunction("myudf", new TestXyz)
util.tableEnv.explainSql("select myudf(f1, f2) from t")
{code}

 

udf: TestXyz 

{code}
public class TestXyz extends ScalarFunction {
public String eval(String s1, String s2) {
String localV1;

if (s1 == null) {
if (s2 != null) {
localV1 = s2;
} else {
localV1 = s2 + s1;
}
} else {
if ("xx".equals(s2)) {
localV1 = s1.length() >= s2.length() ? s1 : s2;
} else {
localV1 = s1;
}
}
if (s1 == null) {
return s2 + localV1;
}
if (s2 == null) {
return s1;
}
return s1.length() >= s2.length() ? s1 + localV1 : s2;
}
}
{code}

 

error stack:

{code}

Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a 
type inference from method:
public java.lang.String 
org.apache.flink.table.planner.runtime.utils.TestXyz.eval(java.lang.String,java.lang.String)
    at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
    at 
org.apache.flink.table.types.extraction.BaseMappingExtractor.extractResultMappings(BaseMappingExtractor.java:154)
    at 
org.apache.flink.table.types.extraction.BaseMappingExtractor.extractOutputMapping(BaseMappingExtractor.java:100)
    ... 53 more
Caused by: org.apache.flink.table.api.ValidationException: Argument name 
conflict, there are at least two argument names that are the same.
    at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
    at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357)
    at 
org.apache.flink.table.types.extraction.FunctionSignatureTemplate.of(FunctionSignatureTemplate.java:73)
    at 
org.apache.flink.table.types.extraction.BaseMappingExtractor.lambda$createParameterSignatureExtraction$9(BaseMappingExtractor.java:381)
    at 
org.apache.flink.table.types.extraction.BaseMappingExtractor.putExtractedResultMappings(BaseMappingExtractor.java:298)
    at 
org.apache.flink.table.types.extraction.BaseMappingExtractor.collectMethodMappings(BaseMappingExtractor.java:244)
    at 
org.apache.flink.table.types.extraction.BaseMappingExtractor.extractResultMappings(BaseMappingExtractor.java:137)
    ... 54 more

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35497) The wrong enum value was used to get month in timestampDiff

2024-05-31 Thread haishui (Jira)
haishui created FLINK-35497:
---

 Summary: The wrong enum value was used to get month in 
timestampDiff
 Key: FLINK-35497
 URL: https://issues.apache.org/jira/browse/FLINK-35497
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: haishui


In 
[SystemFunctionUtils#timestampDiff](https://github.com/apache/flink-cdc/blob/master/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java#L125):
 

{code:java}
case "MONTH":
return to.get(Calendar.YEAR) * 12
+ to.get(Calendar.MONDAY)
- (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONDAY)); {code}
The Calendar.MONDAY can be replaced with Calendar.MONTH.

This does not affect the calculation results, because Calendar.MONDAY = 
Calendar.MONTH = 2.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35496) The annotations of the new JDBC connector should be changed to non-Public/non-PublicEvolving

2024-05-31 Thread RocMarshal (Jira)
RocMarshal created FLINK-35496:
--

 Summary: The annotations of the new JDBC connector should be 
changed to non-Public/non-PublicEvolving
 Key: FLINK-35496
 URL: https://issues.apache.org/jira/browse/FLINK-35496
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / JDBC
Reporter: RocMarshal


In general, we use the Experimental annotation instead of {{PublicEvolving}}  
or {{Public}}  for new features or new apis. But  {{JdbcSink}} and 
JdbcSource(merged ) was marked as {{PublicEvolving}}  in the first version. 
[~fanrui]  commented it to the original PR[1].[1] 
[https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1621857589]

CC [~eskabetxe] [~Sergey Nuyanzin] [~fanrui] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35495) The native metrics for column family are not reported

2024-05-30 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35495:
--

 Summary: The native metrics for column family are not reported
 Key: FLINK-35495
 URL: https://issues.apache.org/jira/browse/FLINK-35495
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35494) Reorganize sources

2024-05-30 Thread Jira
João Boto created FLINK-35494:
-

 Summary: Reorganize sources
 Key: FLINK-35494
 URL: https://issues.apache.org/jira/browse/FLINK-35494
 Project: Flink
  Issue Type: Sub-task
Reporter: João Boto






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35493) Make max history age and count configurable for FlinkStateSnapshot resources

2024-05-30 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35493:


 Summary: Make max history age and count configurable for 
FlinkStateSnapshot resources
 Key: FLINK-35493
 URL: https://issues.apache.org/jira/browse/FLINK-35493
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35492) Add metrics for FlinkStateSnapshot resources

2024-05-30 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35492:


 Summary: Add metrics for FlinkStateSnapshot resources
 Key: FLINK-35492
 URL: https://issues.apache.org/jira/browse/FLINK-35492
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35491) [JUnit5 Migration] Module: Flink CDC modules

2024-05-30 Thread Muhammet Orazov (Jira)
Muhammet Orazov created FLINK-35491:
---

 Summary: [JUnit5 Migration] Module: Flink CDC modules
 Key: FLINK-35491
 URL: https://issues.apache.org/jira/browse/FLINK-35491
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Muhammet Orazov


Migrate Junit4 tests to Junit5 for the following modules:
 * flink-cdc-common
 * flink-cdc-composer
 * flink-cdc-runtime
 * flink-cdc-connect/flink-cdc-pipeline-connectors
 * flink-cdc-e2e-tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35490) [JUnit5 Migration] Module: Flink CDC flink-cdc-connect/flink-cdc-source-connectors

2024-05-30 Thread Muhammet Orazov (Jira)
Muhammet Orazov created FLINK-35490:
---

 Summary: [JUnit5 Migration] Module: Flink CDC 
flink-cdc-connect/flink-cdc-source-connectors
 Key: FLINK-35490
 URL: https://issues.apache.org/jira/browse/FLINK-35490
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Muhammet Orazov


Migrate Junit4 tests to Junit5 in the Flink CDC following modules:

 

- flink-cdc-connect/flink-cdc-source-connectors



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35489) Add capability to set min taskmanager.memory.managed.size when enabling autotuning

2024-05-30 Thread Nicolas Fraison (Jira)
Nicolas Fraison created FLINK-35489:
---

 Summary: Add capability to set min taskmanager.memory.managed.size 
when enabling autotuning
 Key: FLINK-35489
 URL: https://issues.apache.org/jira/browse/FLINK-35489
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: 1.8.0
Reporter: Nicolas Fraison


We have enable the autotuning feature on one of our flink job with below config
{code:java}
# Autoscaler configuration
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 1m
job.autoscaler.metrics.window: 10m
job.autoscaler.target.utilization: "0.8"
job.autoscaler.target.utilization.boundary: "0.1"
job.autoscaler.restart.time: 2m
job.autoscaler.catch-up.duration: 10m
job.autoscaler.memory.tuning.enabled: true
job.autoscaler.memory.tuning.overhead: 0.5
job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
During a scale down the autotuning decided to give all the memory to to JVM 
(having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
Here is the config that was compute by the autotuning for a TM running on a 4GB 
pod:
{code:java}
taskmanager.memory.network.max: 4063232b
taskmanager.memory.network.min: 4063232b
taskmanager.memory.jvm-overhead.max: 433791712b
taskmanager.memory.task.heap.size: 3699934605b
taskmanager.memory.framework.off-heap.size: 134217728b
taskmanager.memory.jvm-metaspace.size: 22960020b
taskmanager.memory.framework.heap.size: "0 bytes"
taskmanager.memory.flink.size: 3838215565b
taskmanager.memory.managed.size: 0b {code}
This has lead to some issue starting the TM because we are relying on some 
javaagent performing some memory allocation outside of the JVM (rely on some C 
bindings).

Tuning the overhead or disabling the scale-down-compensation.enabled could have 
helped for that particular event but this can leads to other issue as it could 
leads to too little HEAP size being computed.

It would be interesting to be able to set a min memory.managed.size to be taken 
in account by the autotuning.
What do you think about this? Do you think that some other specific config 
should have been applied to avoid this issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35488) DataType Support Geometry Type

2024-05-29 Thread Leopold (Jira)
Leopold created FLINK-35488:
---

 Summary: DataType Support Geometry Type
 Key: FLINK-35488
 URL: https://issues.apache.org/jira/browse/FLINK-35488
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Leopold


     I want sync data from mysql to postgresql,but  in  Geometry Datatype filed 
i couldn't do it,mysql geometry datatype data can be transformed to string by 
spatialfuntion ,for example.st_astext(geom) .In other way,postgresql geometry 
datatype data also transformed to string .

     So,i hope Flink suport mysql  and postgrdql databse geometry datatype can 
be transform.

Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35487) ContinuousFileProcessingCheckpointITCase crashed as process exit with code 127

2024-05-29 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35487:
--

 Summary: ContinuousFileProcessingCheckpointITCase crashed as 
process exit with code 127
 Key: FLINK-35487
 URL: https://issues.apache.org/jira/browse/FLINK-35487
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35486) Potential sql expression generation issues on SQL gateway

2024-05-29 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-35486:
---

 Summary: Potential sql expression generation issues on SQL gateway
 Key: FLINK-35486
 URL: https://issues.apache.org/jira/browse/FLINK-35486
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Gateway, Table SQL / Planner
Affects Versions: 1.18.1
Reporter: Xingcan Cui


We hit the following exceptions a few times when submitting queries to a 
session cluster with the Flink SQL gateway. When the same queries were 
submitted again, everything was good. There might be a concurrency problem for 
the expression generator.
{code:java}
"process.thread.name":"sql-gateway-operation-pool-thread-111","log.logger":"org.apache.flink.table.gateway.service.operation.OperationManager","error.type":"org.apache.flink.table.planner.codegen.CodeGenException","error.message":"Mismatch
 of expected output data type 'ARRAY NOT 
NULL>' and function's output type 'ARRAY NOT 
NULL>'.","error.stack_trace":"org.apache.flink.table.planner.codegen.CodeGenException:
 Mismatch of expected output data type 'ARRAY 
NOT NULL>' and function's output type 'ARRAY 
NOT NULL>'.
  at 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyOutputType(BridgingFunctionGenUtil.scala:369)
  at 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyFunctionAwareOutputType(BridgingFunctionGenUtil.scala:359)
  at 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:107)
  at 
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:84)
  at 
org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:79)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:820)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:481)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
  at org.apache.calcite.rex.RexCall.accept(RexCall.java:189)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:478)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:469)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
  at org.apache.calcite.rex.RexCall.accept(RexCall.java:189)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:478)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:469)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
  at org.apache.calcite.rex.RexCall.accept(RexCall.java:189)
  at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:134)
  at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:140)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceP

[jira] [Created] (FLINK-35485) JobMaster failed with "the job xx has not been finished"

2024-05-29 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-35485:
---

 Summary: JobMaster failed with "the job xx has not been finished"
 Key: FLINK-35485
 URL: https://issues.apache.org/jira/browse/FLINK-35485
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.1
Reporter: Xingcan Cui


We ran a session cluster on K8s and used Flink SQL gateway to submit queries. 
Hit the following rare exception once which caused the job manager to restart.
{code:java}
org.apache.flink.util.FlinkException: JobMaster for job 
50d681ae1e8170f77b4341dda6aba9bc failed.
  at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1454)
  at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:776)
  at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$6(Dispatcher.java:698)
  at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
  at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
  at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source)
  at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
  at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
  at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
  at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
  at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
  at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
  at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
  at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
  at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
  at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
  at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
  at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
  at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
  at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
  at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
  at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
  at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
  at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.runtime.jobmaster.JobNotFinishedException: The job 
(50d681ae1e8170f77b4341dda6aba9bc) has not been finished.
  at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.closeAsync(DefaultJobMasterServiceProcess.java:157)
  at 
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.stopJobMasterServiceProcess(JobMasterServiceLeadershipRunner.java:431)
  at 
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.callIfRunning(JobMasterServiceLeadershipRunner.java:476)
  at 
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.lambda$stopJobMasterServiceProcessAsync$12(JobMasterServiceLeadershipRunner.java:407)
  at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(Unknown 
Source)
  at java.base/java.util.concurrent.CompletableFuture.thenCompose(Unknown 
Source)
  at 
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.stopJobMasterServiceProcessAsync(JobMasterServiceLeadershipRunner.java:405)
  at 
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.runIfStateRunning(JobMasterServiceLeadershipRunner.java:463)
  at 
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.revokeLeadership(JobMasterServiceLeadershipRunner.java:397)
  at 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.notifyLeaderContenderOfLeadershipLoss(DefaultLeaderElectionService.java:484)
  at java.base/java.util.HashMap.forEach(Unknown Source)
  at 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onRevokeLeadershipInternal(DefaultLeaderElectionServic

  1   2   3   4   5   6   7   8   9   10   >