[jira] [Created] (FLINK-35584) Support Java 21 in flink-docker
Josh England created FLINK-35584: Summary: Support Java 21 in flink-docker Key: FLINK-35584 URL: https://issues.apache.org/jira/browse/FLINK-35584 Project: Flink Issue Type: Improvement Components: flink-docker Reporter: Josh England Support Java 21. Base images are available for 8, 11 and 17 but since Apache flink now supports Java 21 (albeit in Beta) it would be good to have a base image for that too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35583) Flinker-connector-jdbc is expected to support DDL synchronization for mysql
linux created FLINK-35583: - Summary: Flinker-connector-jdbc is expected to support DDL synchronization for mysql Key: FLINK-35583 URL: https://issues.apache.org/jira/browse/FLINK-35583 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: linux I strongly hope the flinker-connector-jdbc is expected to support DDL synchronization for mysql,This use case is very common and is a feature that many users now expect to have,Hope the official can enhance this function,thanks a lot! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35582) Marking ingestDB as the default recovery mode for rescaling
Yue Ma created FLINK-35582: -- Summary: Marking ingestDB as the default recovery mode for rescaling Key: FLINK-35582 URL: https://issues.apache.org/jira/browse/FLINK-35582 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35580) Fix ingestDB recovery mode related bugs
Yue Ma created FLINK-35580: -- Summary: Fix ingestDB recovery mode related bugs Key: FLINK-35580 URL: https://issues.apache.org/jira/browse/FLINK-35580 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35581) Remove comments from the code related to ingestDB
Yue Ma created FLINK-35581: -- Summary: Remove comments from the code related to ingestDB Key: FLINK-35581 URL: https://issues.apache.org/jira/browse/FLINK-35581 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35579) Update the FrocksDB version in FLINK
Yue Ma created FLINK-35579: -- Summary: Update the FrocksDB version in FLINK Key: FLINK-35579 URL: https://issues.apache.org/jira/browse/FLINK-35579 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35578) Release Frocksdb-8.10.0 official products
Yue Ma created FLINK-35578: -- Summary: Release Frocksdb-8.10.0 official products Key: FLINK-35578 URL: https://issues.apache.org/jira/browse/FLINK-35578 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35577) Setup the CI environment for FRocksDB-8.10
Yue Ma created FLINK-35577: -- Summary: Setup the CI environment for FRocksDB-8.10 Key: FLINK-35577 URL: https://issues.apache.org/jira/browse/FLINK-35577 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35576) FRocksdb Cherry pick IngestDB requires commit
Yue Ma created FLINK-35576: -- Summary: FRocksdb Cherry pick IngestDB requires commit Key: FLINK-35576 URL: https://issues.apache.org/jira/browse/FLINK-35576 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 We support the API related to ingest DB in FRocksDb-8.10.0, but many of the fixes related to ingest DB were only integrated in the latest RocksDB version. So we need to add these fixed commit cherryclicks to FRocksDB. Mainly include: [https://github.com/facebook/rocksdb/pull/11646] [https://github.com/facebook/rocksdb/pull/11868] [https://github.com/facebook/rocksdb/pull/11811] [https://github.com/facebook/rocksdb/pull/11381] [https://github.com/facebook/rocksdb/pull/11379] [https://github.com/facebook/rocksdb/pull/11378] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35575) FRocksDB supports disabling perf context during compilation
Yue Ma created FLINK-35575: -- Summary: FRocksDB supports disabling perf context during compilation Key: FLINK-35575 URL: https://issues.apache.org/jira/browse/FLINK-35575 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 In FrocksDB 6 thread-local perf-context is disabled by reverting a specific commit (FLINK-19710). However, this creates conflicts and makes upgrading more difficult. We found that disabling *PERF_CONTEXT* can improve the performance of statebenchmark by about 5% and it doesn't create any conflicts. So we plan to supports disabling perf context during compilation in FRocksDB new version -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35574) Set up base branch for FrocksDB-8.10
Yue Ma created FLINK-35574: -- Summary: Set up base branch for FrocksDB-8.10 Key: FLINK-35574 URL: https://issues.apache.org/jira/browse/FLINK-35574 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 As the first part of FLINK-35573, we need to prepare a base branch for FRocksDB-8.10.0 first. Mainly, it needs to be checked out from version 8.10.0 of the Rocksdb community. Then check pick the commit which used by Flink from FRocksDB-6.20.3 to 8.10.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35573) [FLIP-447] Upgrade FRocksDB from 6.20.3 to 8.10.0
Yue Ma created FLINK-35573: -- Summary: [FLIP-447] Upgrade FRocksDB from 6.20.3 to 8.10.0 Key: FLINK-35573 URL: https://issues.apache.org/jira/browse/FLINK-35573 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 2.0.0 Reporter: Yue Ma Fix For: 2.0.0 The FLIP: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0|https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0] *_This FLIP proposes upgrading the version of FRocksDB in the Flink Project from 6.20.3 to 8.10.0._* _RocksDBStateBackend is widely used by Flink users in large state scenarios.The last upgrade of FRocksDB was in version Flink-1.14, which mainly supported features such as support arm platform, deleteRange API, period compaction, etc. It has been a long time since then, and RocksDB has now been released to version 8.x. The main motivation for this upgrade is to leverage the features of higher versions of Rocksdb to make Flink RocksDBStateBackend more powerful. While RocksDB is also continuously optimizing and bug fixing, we hope to keep FRocksDB more or less in sync with RocksDB and upgrade it periodically._ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35572) flink db2 cdc default value error
junxin lai created FLINK-35572: --- Summary: flink db2 cdc default value error Key: FLINK-35572 URL: https://issues.apache.org/jira/browse/FLINK-35572 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: junxin lai I am using flink db2-cdc to sync database in real time,but fails to handle default values in schema when is making the snapshot. After digging deeper into the problem, I found that this seems to be a bug in debezium and was fixed in 2.0.0.CR1([https://issues.redhat.com/browse/DBZ-4990]). The latest flink3.1 uses debezium version 1.9.8.Final. The default value is a common configuration in DB2. Is there a way we can backport this patch to 1.9.8.Final? !https://private-user-images.githubusercontent.com/18555755/338830194-2959745b-0952-4a27-a741-c03d13c47061.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTgxNzUyODYsIm5iZiI6MTcxODE3NDk4NiwicGF0aCI6Ii8xODU1NTc1NS8zMzg4MzAxOTQtMjk1OTc0NWItMDk1Mi00YTI3LWE3NDEtYzAzZDEzYzQ3MDYxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjEyVDA2NDk0NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThmMTk2ZWY2ZjRiM2U1MTE5ZDI5NGRiOThmZDBkMTk2ZGQ4YzUwNGZjMzQxNDEwNGExMWNiZmJmMzM2ZmIyYzMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.vehLYcbVqKM-exZU4E_DifFRfmWACAKFD_9Wo1z_0So! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35571) ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation
Grace Grimwood created FLINK-35571: -- Summary: ProfilingServiceTest.testRollingDeletion intermittently fails due to improper test isolation Key: FLINK-35571 URL: https://issues.apache.org/jira/browse/FLINK-35571 Project: Flink Issue Type: Bug Components: Tests Environment: *Git revision:* {code:bash} $ git show commit b8d527166e095653ae3ff5c0431bf27297efe229 (HEAD -> master) {code} *Java info:* {code:bash} $ java -version openjdk version "17.0.11" 2024-04-16 OpenJDK Runtime Environment Temurin-17.0.11+9 (build 17.0.11+9) OpenJDK 64-Bit Server VM Temurin-17.0.11+9 (build 17.0.11+9, mixed mode) {code} {code:bash} $ sdk current Using: java: 17.0.11-tem maven: 3.8.6 scala: 2.12.19 {code} *OS info:* {code:bash} $ uname -av Darwin MacBook-Pro 23.5.0 Darwin Kernel Version 23.5.0: Wed May 1 20:14:38 PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 arm64 {code} *Hardware info:* {code:bash} $ sysctl -a | grep -e 'machdep\.cpu\.brand_string\:' -e 'machdep\.cpu\.core_count\:' -e 'hw\.memsize\:' hw.memsize: 34359738368 machdep.cpu.core_count: 12 machdep.cpu.brand_string: Apple M2 Pro {code} Reporter: Grace Grimwood Attachments: 20240612_181148_mvn-clean-package_flink-runtime_also-make.log *Symptom:* The test *{{ProfilingServiceTest.testRollingDeletion}}* fails with the following error: {code:java} [ERROR] Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 25.32 s <<< FAILURE! -- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest [ERROR] org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion -- Time elapsed: 9.264 s <<< FAILURE! org.opentest4j.AssertionFailedError: expected: <3> but was: <6> at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531) at org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:175) at org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) {code} The number of extra files found varies from failure to failure. *Cause:* Many of the tests in *{{ProfilingServiceTest}}* rely on a specific configuration of the *{{ProfilingService}}* instance, but *{{ProfilingService.getInstance}}* does not check whether an existing instance's config matches the provided config before returning it. Because of this, and because JUnit does not guarantee a specific ordering of tests (unless they are specifically annotated), it is possible for these tests to receive an instance that does not behave in the expected way and therefore fail. *Analysis:* In troubleshooting the test failure, we tried adding an extra assertion to *{{ProfilingServiceTest.setUp}}* to validate the directories being written to were correct: {code:java} Assertions.assertEquals(tempDir.toString(), profilingService.getProfilingResultDir()); {code} That assert produced the following failure: {code:java} org.opentest4j.AssertionFailedError: expected: but was: {code} This failure shows that the *{{ProfilingService}}* returned by *{{ProfilingService.getInstance}}* in the setup is not using the correct directory, and therefore cannot be the correct instance for this test class because it has the wrong config. This is because the static method *{{ProfilingService.getInstance}}* attempts to reuse any existing instance of *{{ProfilingService}}* before it creates a new one and disregards any differences in config in doing so, which means that if another test instantiates a *{{ProfilingService}}* with different config first and does not close it, that previous instance will be provided to *{{ProfilingServiceTest}}* rather than the new instance those tests seem to expect
[jira] [Created] (FLINK-35570) Consider PlaceholderStreamStateHandle in checkpoint file merging
Zakelly Lan created FLINK-35570: --- Summary: Consider PlaceholderStreamStateHandle in checkpoint file merging Key: FLINK-35570 URL: https://issues.apache.org/jira/browse/FLINK-35570 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Zakelly Lan Assignee: Zakelly Lan In checkpoint file merging, we should take {{PlaceholderStreamStateHandle}} into account during lifecycle, since it can be a file merged one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed
Jane Chan created FLINK-35569: - Summary: SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed Key: FLINK-35569 URL: https://issues.apache.org/jira/browse/FLINK-35569 Project: Flink Issue Type: Bug Components: Build System / Azure Pipelines, Build System / CI Affects Versions: 1.20.0 Reporter: Jane Chan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35568) Add imagePullSecrets for FlinkDeployment spec
Gang Huang created FLINK-35568: -- Summary: Add imagePullSecrets for FlinkDeployment spec Key: FLINK-35568 URL: https://issues.apache.org/jira/browse/FLINK-35568 Project: Flink Issue Type: Improvement Reporter: Gang Huang I am confused that how to configure imagePullSecrets for a private dockerhub website, since there maybe are no related parameters found in the official docs (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error
Hongshun Wang created FLINK-35567: - Summary: CDC BinaryWriter cast NullableSerializerWrapper error Key: FLINK-35567 URL: https://issues.apache.org/jira/browse/FLINK-35567 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: Hongshun Wang Fix For: cdc-3.1.1 Current, we will generate data type serializers by org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]), which will put into a NullableSerializerWrapper. {code:java} //代码占位符 public BinaryRecordDataGenerator(DataType[] dataTypes) { this( dataTypes, Arrays.stream(dataTypes) .map(InternalSerializers::create) .map(NullableSerializerWrapper::new) .toArray(TypeSerializer[]::new)); } {code} However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast NullableSerializerWrapper to ArrayDataSerializer/TypeSerializer/TypeSerializer. A exception will be thrown: {code:java} java.lang.ClassCastException: org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer at org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134) at org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35566) Consider promoting TypeSerializer from PublicEvolving to Public
Martijn Visser created FLINK-35566: -- Summary: Consider promoting TypeSerializer from PublicEvolving to Public Key: FLINK-35566 URL: https://issues.apache.org/jira/browse/FLINK-35566 Project: Flink Issue Type: Technical Debt Components: API / Core Reporter: Martijn Visser While working on implementing FLINK-35378, I ran into the problem that TypeSerializer is still on PublicEvolving since Flink 1.0. We should consider annotating this as Public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
Naci Simsek created FLINK-35565: --- Summary: Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset Key: FLINK-35565 URL: https://issues.apache.org/jira/browse/FLINK-35565 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-3.1.0 Environment: This is reproduced on a *Flink 1.18.1* with the latest Kafka connector 3.1.0-1.18 on a session cluster. Reporter: Naci Simsek Attachments: image-2024-06-11-11-19-09-889.png, taskmanager_localhost_54489-ac092a_log.txt h2. Summary Flink batch job gets into an infinite fetch loop and could not gracefully finish if the connected Kafka topic is empty and starting offset value in Flink job is lower than the current start/end offset of the related topic. See below for details: h2. How to reproduce Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events from Kafka topic. Related Kafka topic is empty, there are no events, and the offset value is as below: *15* !image-2024-06-11-11-19-09-889.png|width=895,height=256! Flink job uses a *specific starting offset* value, which is +*less*+ than the current offset of the topic/partition. See below, it set as “4” {{}} {code:java} package naci.grpId; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.kafka.common.TopicPartition; import java.util.HashMap; import java.util.Map; public class KafkaSource_Print { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); // Define the specific offsets for the partitions Map specificOffsets = new HashMap<>(); specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start from offset 4 for partition 0 KafkaSource kafkaSource = KafkaSource .builder() .setBootstrapServers("localhost:9093") // Make sure the port is correct .setTopics("topic_test") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)) .setBounded(OffsetsInitializer.latest()) .build(); DataStream stream = env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source" ); stream.print(); env.execute("Flink KafkaSource test job"); } }{code} {{}} Here are the initial logs printed related to the offset, as soon as the job gets submitted: {{}} {code:java} 2024-05-30 12:15:50,010 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]] 2024-05-30 12:15:50,069 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]] 2024-05-30 12:15:50,074 TRACE org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - Seeking starting offsets to specified offsets: {topic_test-0=4} 2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Seeking to offset 4 for partition topic_test-0 2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - SplitsChange handling result: [topic_test-0, start:4, stop: 15] 2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]] 2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask{code} {{}} Since the starting offset {color:#FF}*4*{color} is *out of range* for the Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task manager logs: {{}} {code:java} 2024-05-30 12:15:50,193 INFO org.apache.kafka.clients.consumer.internals.Fetcher [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position FetchPosit
[jira] [Created] (FLINK-35564) The topic cannot be distributed on subtask when calculatePartitionOwner returns -1
中国无锡周良 created FLINK-35564: -- Summary: The topic cannot be distributed on subtask when calculatePartitionOwner returns -1 Key: FLINK-35564 URL: https://issues.apache.org/jira/browse/FLINK-35564 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.2 Reporter: 中国无锡周良 The topic cannot be distributed on subtask when calculatePartitionOwner returns -1 {code:java} @VisibleForTesting static int calculatePartitionOwner(String topic, int partitionId, int parallelism) { int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % parallelism; /* * Here, the assumption is that the id of Pulsar partitions are always ascending starting from * 0. Therefore, can be used directly as the offset clockwise from the start index. */ return (startIndex + partitionId) % parallelism; } {code} Here startIndex is a non-negative number calculated based on topic.hashCode() and in the range [0, parallelism-1]. For non-partitioned topic. partitionId is NON_PARTITION_ID = -1; but {code:java} @Override public Optional> createAssignment( List readers) { if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) { return Optional.empty(); } Map> assignMap = new HashMap<>(pendingPartitionSplits.size()); for (Integer reader : readers) { Set splits = pendingPartitionSplits.remove(reader); if (splits != null && !splits.isEmpty()) { assignMap.put(reader, new ArrayList<>(splits)); } } if (assignMap.isEmpty()) { return Optional.empty(); } else { return Optional.of(new SplitsAssignment<>(assignMap)); } } {code} pendingPartitionSplits can't possibly have a value of -1, right? The calculation method of the topic by the above return 1, pendingPartitionSplits. Remove (reader), forever is null; This topic will not be assigned to a subtask; And I simulated this topic locally and found that messages were indeed not processed; -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35563) 'Run kubernetes application test' failed on AZP
Weijie Guo created FLINK-35563: -- Summary: 'Run kubernetes application test' failed on AZP Key: FLINK-35563 URL: https://issues.apache.org/jira/browse/FLINK-35563 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35562) WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds
Weijie Guo created FLINK-35562: -- Summary: WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds Key: FLINK-35562 URL: https://issues.apache.org/jira/browse/FLINK-35562 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35561) Flink REST API incorrect documentation
Shyam created FLINK-35561: - Summary: Flink REST API incorrect documentation Key: FLINK-35561 URL: https://issues.apache.org/jira/browse/FLINK-35561 Project: Flink Issue Type: Bug Components: Documentation, Runtime / REST Reporter: Shyam Flink REST API documentation for JAR upload ([/jars/upload|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-upload]) states that the response will contain fileName, which will then be used later to run Flink Jobs. In the [/jar/:jarid/run]([https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run)] documentation, the definition for :jarid is "String value that identifies a jar. When uploading the jar a path is returned, where the filename is the ID. This value is equivalent to the `id` field in the list of uploaded jars (/jars)." This statement identifying file name should be changed to: String value that identifies a jar. When uploading the jar, a path is returned, where the {*}filename contains the ID{*}, and it is the text after the last forward slash. This value is equivalent to the `id` field in the list of uploaded jars (/jars). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
dongwoo.kim created FLINK-35560: --- Summary: Add query validator support to flink sql gateway via spi pattern Key: FLINK-35560 URL: https://issues.apache.org/jira/browse/FLINK-35560 Project: Flink Issue Type: Improvement Components: Table SQL / Gateway Reporter: dongwoo.kim h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:sql} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35559) Shading issue cause class conflict
Aleksandr Pilipenko created FLINK-35559: --- Summary: Shading issue cause class conflict Key: FLINK-35559 URL: https://issues.apache.org/jira/browse/FLINK-35559 Project: Flink Issue Type: Bug Components: Connectors / AWS Affects Versions: aws-connector-4.3.0, aws-connector-4.2.0 Reporter: Aleksandr Pilipenko Fix For: aws-connector-4.4.0 Incorrect shading configuration causes ClassCastException during exception handling when job package flink-connector-kinesis with another AWS sink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35558) [docs] "Edit This Page" tool does not follow contribution guidelines
Matt Braymer-Hayes created FLINK-35558: -- Summary: [docs] "Edit This Page" tool does not follow contribution guidelines Key: FLINK-35558 URL: https://issues.apache.org/jira/browse/FLINK-35558 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Matt Braymer-Hayes h1. Problem The [documentation site|https://nightlies.apache.org/flink/flink-docs-release-1.19/] offers an "Edit This Page" button at the bottom of most pages ([example|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/try-flink/local_installation/#summary]), allowing a user to quickly open a GitHub PR to resolve the issue. Unfortunately this feature uses the version branch (e.g., {{{}release-1.19{}}}) as the base, whereas the [documentation contribution guide|https://flink.apache.org/how-to-contribute/contribute-documentation/#submit-your-contribution] expects {{master}} to be the base. Since these release branches are often incompatible with {{master}} (i.e., I can't do a simple rebase or merge), I end up not being able to use the "Edit This Page" feature and instead have to make the change myself on GitHub or locally. h1. Solution Edit the anchor ([source|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/docs/layouts/partials/docs/footer.html#L30]) to use {{master}} instead of {{{}.Site.Params.Branch{}}}. This would lower the barrier to entry significantly for docs changes and allow the "Edit This Page" feature to work as intended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35557) MemoryManager only reserves memory per consumer type once
Roman Khachatryan created FLINK-35557: - Summary: MemoryManager only reserves memory per consumer type once Key: FLINK-35557 URL: https://issues.apache.org/jira/browse/FLINK-35557 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Runtime / Task Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3, 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0, 1.19.1 # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526] a reserve function # The function [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61] the available Slot memory and fails if there's not enough memory # We pass it to {{SharedResources.getOrAllocateSharedResource}} # In {{SharedResources.getOrAllocateSharedResource}} , we check if the resource (memory) was already reserved by some key (e.g. {{{}state-rocks-managed-memory{}}}) # If not, we create a new one and call the reserve function # If the resource was already reserved (not null), we do NOT reserve the memory again: [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71] So there will be only one (first) memory reservation for rocksdb for example, no matter how many state backends are created. Meaning that managed memory limits are not followed (edited) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
Roman Khachatryan created FLINK-35556: - Summary: Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED Key: FLINK-35556 URL: https://issues.apache.org/jira/browse/FLINK-35556 Project: Flink Issue Type: Bug Affects Versions: 1.19.0, 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0, 1.19.1 See https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35555) Serializing List with null values throws NPE
Zhanghao Chen created FLINK-3: - Summary: Serializing List with null values throws NPE Key: FLINK-3 URL: https://issues.apache.org/jira/browse/FLINK-3 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.20.0 Reporter: Zhanghao Chen Fix For: 1.20.0 FLINK-34123 introduced built-in serialization support for java.util.List, which relies on the existing {{ListSerializer}} impl. However, {{ListSerializer}} does not allow null values, as it is originally designed for serializing {{ListState}} only where null value is explicitly forbidden in the contract. Directly adding null marker to allow null values will break backwards state compatibility, so we'll need to introduce a new List serializer and corrsponding TypeInformation that allows null values for serializing user objects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35554) usrlib is not added to classpath when using containers
Josh England created FLINK-35554: Summary: usrlib is not added to classpath when using containers Key: FLINK-35554 URL: https://issues.apache.org/jira/browse/FLINK-35554 Project: Flink Issue Type: Bug Components: flink-docker Affects Versions: 1.19.0 Environment: Docker Reporter: Josh England We use flink-docker to create a "standalone" application, with a Dockerfile like... {code:java} FROM flink:1.18.1-java17 COPY application.jar /opt/flink/usrlib/artifacts/ {code} However, after upgrading to 1.19.0 we found our application would not start. We saw errors like the following in the logs: {noformat} org.apache.flink.util.FlinkException: Could not load the provided entrypoint class. at org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:230) at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:149) at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.lambda$main$0(StandaloneApplicationClusterEntryPoint.java:90) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:89) Caused by: org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'X' was not found in the jar file. {noformat} We were able to fix the issue by placing the application.jar in /opt/flink/lib instead. My guess is that the usrlib directory isn't being added to the classpath by the shell scripts that launch flink from a container. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35553) Integrate newly added trigger interface with checkpointing
Matthias Pohl created FLINK-35553: - Summary: Integrate newly added trigger interface with checkpointing Key: FLINK-35553 URL: https://issues.apache.org/jira/browse/FLINK-35553 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing, Runtime / Coordination Reporter: Matthias Pohl This connects the newly introduced trigger logic (FLINK-35551) with the newly added checkpoint lifecycle listening feature (FLINK-35552). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35552) Move CheckpointStatsTracker out of ExecutionGraph into Scheduler
Matthias Pohl created FLINK-35552: - Summary: Move CheckpointStatsTracker out of ExecutionGraph into Scheduler Key: FLINK-35552 URL: https://issues.apache.org/jira/browse/FLINK-35552 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing, Runtime / Coordination Reporter: Matthias Pohl The scheduler needs to know about the CheckpointStatsTracker to allow listening to checkpoint failures and completion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35551) Introduces RescaleManager#onTrigger endpoint
Matthias Pohl created FLINK-35551: - Summary: Introduces RescaleManager#onTrigger endpoint Key: FLINK-35551 URL: https://issues.apache.org/jira/browse/FLINK-35551 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl The new endpoint would allow use from separating observing change events from actually triggering the rescale operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35550) Introduce new component RescaleManager
Matthias Pohl created FLINK-35550: - Summary: Introduce new component RescaleManager Key: FLINK-35550 URL: https://issues.apache.org/jira/browse/FLINK-35550 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Matthias Pohl The goal here is to collect the rescaling logic in a single component to improve testability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35549) FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing for the AdaptiveScheduler
Matthias Pohl created FLINK-35549: - Summary: FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing for the AdaptiveScheduler Key: FLINK-35549 URL: https://issues.apache.org/jira/browse/FLINK-35549 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Coordination Affects Versions: 1.20.0 Reporter: Matthias Pohl This is the umbrella issue for implementing [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35548) Add E2E tests for PubSubSinkV2
Ahmed Hamdy created FLINK-35548: --- Summary: Add E2E tests for PubSubSinkV2 Key: FLINK-35548 URL: https://issues.apache.org/jira/browse/FLINK-35548 Project: Flink Issue Type: Sub-task Reporter: Ahmed Hamdy Assignee: Ahmed Hamdy Fix For: gcp-pubsub-3.2.0 Refactor Google PubSub source to use Unified Sink API [FLIP-143|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35547) Sources support HybridSource implements
linqigeng created FLINK-35547: - Summary: Sources support HybridSource implements Key: FLINK-35547 URL: https://issues.apache.org/jira/browse/FLINK-35547 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: linqigeng Since the specifications of slave instances are generally smaller than the primary instance, it is more likely to restart when encountering performance bottlenecks(e.g. CPU,memory,disk), and there is a certain delay with the primary instance. In order not to affect the running Flink CDC jobs and improve effectiveness, here is an idea: Sources adapt HybridSouce, so we can do like: * in the snapshot stage , readers fetch data from the slave instances * after the snapshot phase ends, switches to reading binlog from the primary instance -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35546) Elasticsearch 8 connector fails fast for non-retryable bulk request items
Mingliang Liu created FLINK-35546: - Summary: Elasticsearch 8 connector fails fast for non-retryable bulk request items Key: FLINK-35546 URL: https://issues.apache.org/jira/browse/FLINK-35546 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: Mingliang Liu Discussion thread: [https://lists.apache.org/thread/yrf0mmbch0lhk3rgkz94fr0x5qz2417l] {quote} Currently the Elasticsearch 8 connector retries all items if the request fails as a whole, and retries failed items if the request has partial failures [[1|https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170]\]. I think this infinitely retries might be problematic in some cases when retrying can never eventually succeed. For example, if the request is 400 (bad request) or 404 (not found), retries do not help. If there are too many failed items non-retriable, new requests will get processed less effectively. In extreme cases, it may stall the pipeline if in-flight requests are occupied by those failed items. FLIP-451 proposes timeout for retrying which helps with un-acknowledged requests, but not addressing the case when request gets processed and failed items keep failing no matter how many times we retry. Correct me if I'm wrong. One opinionated option is to fail fast for non-retriable errors like 400 / 404 and to drop items for 409. Or we can allow users to configure "drop/fail" behavior for non-retriable errors. I prefer the latter. I checked how LogStash ingests data to Elasticsearch and it takes a similar approach for non-retriable errors [[2|https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304]\]. In my day job, we have a dead-letter-queue in AsynSinkWriter for failed entries that exhaust retries. I guess that is too specific to our setup and seems an overkill here for Elasticsearch connector. {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35545) Miss 3.0.1 version in snapshot flink-cdc doc version list
Zhongqiang Gong created FLINK-35545: --- Summary: Miss 3.0.1 version in snapshot flink-cdc doc version list Key: FLINK-35545 URL: https://issues.apache.org/jira/browse/FLINK-35545 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Zhongqiang Gong -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35544) No description of Flink Operator in the Deployment of Kubernetes document
linqigeng created FLINK-35544: - Summary: No description of Flink Operator in the Deployment of Kubernetes document Key: FLINK-35544 URL: https://issues.apache.org/jira/browse/FLINK-35544 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: linqigeng There is no description of Flink Operator in the Deployment of Kubernetes document, so users don't know how to submit a pipeline job via Flink Operator. The link is : {code:java} https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/deployment/kubernetes/ {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35543) Upgrade Hive 2.3 connector to version 2.3.10
Cheng Pan created FLINK-35543: - Summary: Upgrade Hive 2.3 connector to version 2.3.10 Key: FLINK-35543 URL: https://issues.apache.org/jira/browse/FLINK-35543 Project: Flink Issue Type: Improvement Components: Connectors / Hive Reporter: Cheng Pan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset
Jan Gurda created FLINK-35542: - Summary: ClassNotFoundException when deserializing CheckpointedOffset Key: FLINK-35542 URL: https://issues.apache.org/jira/browse/FLINK-35542 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: jdbc-3.1.2 Environment: Flink 1.19.0 Flink JDBC Connector 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca) JDK 11 (Temurin) Reporter: Jan Gurda Fix For: jdbc-3.2.0 I use the latest flink-connector-jdbc code from the main branch, it's actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca). When jobs get interrupted while reading data from the JDBC source (for example, by the TaskManager outage), they cannot recover due to the following exception: {code:java} java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34) at org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186) at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571) at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92) at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69
[jira] [Created] (FLINK-35541) Introduce retry limiting for AWS connector sinks
Aleksandr Pilipenko created FLINK-35541: --- Summary: Introduce retry limiting for AWS connector sinks Key: FLINK-35541 URL: https://issues.apache.org/jira/browse/FLINK-35541 Project: Flink Issue Type: Technical Debt Components: Connectors / AWS, Connectors / DynamoDB, Connectors / Firehose, Connectors / Kinesis Affects Versions: aws-connector-4.2.0 Reporter: Aleksandr Pilipenko Currently if the record write operation in the sink consistently fails with retriable error, sinks will retry indefinitely. In case when cause of the error is not resolved this may lead to stuck operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name
Qigeng Lin created FLINK-35540: -- Summary: flink-cdc-pipeline-connector-mysql lost table which database and table with the same name Key: FLINK-35540 URL: https://issues.apache.org/jira/browse/FLINK-35540 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: Qigeng Lin h1. Description When the parameter of 'tables' in mysql pipeline job contains a table which database and table are with the same name like 'app.app', the job will fail and the error meaasge is like: {code:java} java.lang.IllegalArgumentException: Cannot find any table by the option 'tables' = app.app {code} h1. How to reproduce Create database and table all named `{{{}app`{}}}, then submit a pipeline job like this YAML defined: {code:java} source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app.app server-id: 5400-5404 server-time-zone: UTCsink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" table.create.properties.light_schema_change: true table.create.properties.replication_num: 1pipeline: name: Sync MySQL Database to Doris parallelism: 2 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35539) The artifactId of flink-migration-test-utils module has a typo
Zhen Wang created FLINK-35539: - Summary: The artifactId of flink-migration-test-utils module has a typo Key: FLINK-35539 URL: https://issues.apache.org/jira/browse/FLINK-35539 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.20.0 Reporter: Zhen Wang The artifactId of flink-migration-test-utils module has a typo. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35538) pipeline sink support caseSensitive
melin created FLINK-35538: - Summary: pipeline sink support caseSensitive Key: FLINK-35538 URL: https://issues.apache.org/jira/browse/FLINK-35538 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.0, cdc-3.1.1 Reporter: melin Attachments: image-2024-06-06-15-51-02-428.png source is case sensitive, sink is case insensitive. Even paimon doesn't allow capital letters !image-2024-06-06-15-51-02-428.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35537) Error parsing list of enum in legacy yaml configuration
Zakelly Lan created FLINK-35537: --- Summary: Error parsing list of enum in legacy yaml configuration Key: FLINK-35537 URL: https://issues.apache.org/jira/browse/FLINK-35537 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.19.0 Reporter: Zakelly Lan In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws {code:java} Caused by: java.lang.IllegalArgumentException: Could not parse value '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'. at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827) at org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312) at org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361) at org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449) at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219) at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138) at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) ... 19 more Caused by: java.lang.IllegalArgumentException: Could not parse value for enum class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, DISABLE_COMPRESSION_OPTION]] at org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499) at org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392) at org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441) at org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432) at org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819) at java.util.Optional.map(Optional.java:215) at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819) ... 28 more {code} I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in flink-conf.yaml. I also tried the flink-1.18.1, and it runs well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35536) FileSystem sink on S3 produces invalid Avros when compaction is disabled
Juliusz Nadberezny created FLINK-35536: -- Summary: FileSystem sink on S3 produces invalid Avros when compaction is disabled Key: FLINK-35536 URL: https://issues.apache.org/jira/browse/FLINK-35536 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.19.0 Reporter: Juliusz Nadberezny Compaction on FileSystem sink on S3 uses multipart upload process. When compaction is disabled after being enabled, the files that where being kept by multipart upload and then are "released" with CompleteMultipartUpload will be broken. Broken Avro files seem to have Avro schema duplicated at the beginning of the file. Steps to reproduce: 1. Deploy job with FileSystem sink with compaction enabled writing to S3/MinIO. 2. Wait for job to produce some output. 3. Redeploy job with compaction disabled. 4. Wait for multipart upload complete and verify released files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35535) Enable benchmark profiling in daily run
Zakelly Lan created FLINK-35535: --- Summary: Enable benchmark profiling in daily run Key: FLINK-35535 URL: https://issues.apache.org/jira/browse/FLINK-35535 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.20.0 After FLINK-35534, the flink-benchmark supports profiling. We could consider enabling this in daily run. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35534) Support profiler for benchmarks
Zakelly Lan created FLINK-35534: --- Summary: Support profiler for benchmarks Key: FLINK-35534 URL: https://issues.apache.org/jira/browse/FLINK-35534 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Zakelly Lan Fix For: 1.20.0 As JMH support profiling during benchmark, the flink-benchmark could leverage this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35533) FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn
Yuxin Tan created FLINK-35533: - Summary: FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn Key: FLINK-35533 URL: https://issues.apache.org/jira/browse/FLINK-35533 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.20.0 Reporter: Yuxin Tan Assignee: Yuxin Tan This is the umbrella jira for [FLIP-459|https://cwiki.apache.org/confluence/display/FLINK/FLIP-459%3A+Support+Flink+hybrid+shuffle+integration+with+Apache+Celeborn]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35532) Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard
Hong Liang Teoh created FLINK-35532: --- Summary: Prevent Cross-Site Authentication (XSA) attacks on Flink dashboard Key: FLINK-35532 URL: https://issues.apache.org/jira/browse/FLINK-35532 Project: Flink Issue Type: Technical Debt Components: Runtime / Web Frontend Affects Versions: 1.19.0, 1.19.1 Reporter: Hong Liang Teoh Assignee: Hong Liang Teoh Fix For: 1.19.2 As part of FLINK-33325, we introduced a new tab on the Flink dashboard to trigger the async profiler on the JobManager and TaskManager. The HTML component introduced links out to async profiler page on Github -> [https://github.com/async-profiler/async-profiler/wiki]. However, the anchor element introduced does not follow best practices around preventing XSA attacks, by setting up the below: {code:java} target="_blank" rel="noopener noreferrer"{code} We should add these attributes as best practice! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35531) Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream
dzcxzl created FLINK-35531: -- Summary: Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream Key: FLINK-35531 URL: https://issues.apache.org/jira/browse/FLINK-35531 Project: Flink Issue Type: Bug Reporter: dzcxzl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35530) protobuf-format support discard unknow field Improve deserialization performance
JingWei Li created FLINK-35530: -- Summary: protobuf-format support discard unknow field Improve deserialization performance Key: FLINK-35530 URL: https://issues.apache.org/jira/browse/FLINK-35530 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: JingWei Li Add a protobuf option that allows calling _CodedStreamHelper.discardUnknownFields_ to save the performance overhead of deserializing unknown fields when decoding data. {code:java} create table source (...) with ( 'format' = 'protobuf', 'protobuf.discard-unknown-field' = 'true' ){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35529) protobuf-format compatible protobuf bad indentifier
JingWei Li created FLINK-35529: -- Summary: protobuf-format compatible protobuf bad indentifier Key: FLINK-35529 URL: https://issues.apache.org/jira/browse/FLINK-35529 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.17.2 Reporter: JingWei Li Fix For: 2.0.0 The main bug occurs during the decode process. The decode method is a method generated by the codegen of Flink at runtime, and in the process of generating the decode method, some getter and setter methods of the protobuf object need to be used to construct the RowData. Currently, the way to generate the getter and setter is through string concatenation, using the "get" prefix and camelCase variable names. Some special characters may lead to errors in the generated Getter and Setter methods, thus causing bugs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35528) Skip execution of interruptible mails when yielding
Piotr Nowojski created FLINK-35528: -- Summary: Skip execution of interruptible mails when yielding Key: FLINK-35528 URL: https://issues.apache.org/jira/browse/FLINK-35528 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing, Runtime / Task Affects Versions: 1.20.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski When operators are yielding, for example waiting for async state access to complete before a checkpoint, it would be beneficial to not execute interruptible mails. Otherwise continuation mail for firing timers would be continuously re-enqeueed. To achieve that MailboxExecutor must be aware which mails are interruptible. The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35527) Polish quickstart guide & clean stale links in docs
yux created FLINK-35527: --- Summary: Polish quickstart guide & clean stale links in docs Key: FLINK-35527 URL: https://issues.apache.org/jira/browse/FLINK-35527 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: yux Fix For: cdc-3.2.0 Currently, there's still a lot of stale links in Flink CDC docs, including some download links pointing to Ververica maven repositories. Need to clean them up to avoid user conflicts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35526) Remove deprecated stedolan/jq Docker image from Flink e2e tests
Robert Metzger created FLINK-35526: -- Summary: Remove deprecated stedolan/jq Docker image from Flink e2e tests Key: FLINK-35526 URL: https://issues.apache.org/jira/browse/FLINK-35526 Project: Flink Issue Type: Bug Components: Test Infrastructure Reporter: Robert Metzger Assignee: Robert Metzger Our CI logs contain this warning: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60060=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=3828 {code} latest: Pulling from stedolan/jq [DEPRECATION NOTICE] Docker Image Format v1, and Docker Image manifest version 2, schema 1 support will be removed in an upcoming release. Suggest the author of docker.io/stedolan/jq:latest to upgrade the image to the OCI Format, or Docker Image manifest v2, schema 2. More information at https://docs.docker.com/go/deprecated-image-specs/ {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM
Zhen Wang created FLINK-35525: - Summary: HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM Key: FLINK-35525 URL: https://issues.apache.org/jira/browse/FLINK-35525 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.18.1, 1.19.0 Reporter: Zhen Wang I tried running flink with hadoop proxy user by disabling HadoopModuleFactory and flink built-in token providers, and implementing a custom token provider. However, only the hdfs token obtained by hadoopfs provider was added in YarnClusterDescriptor, which resulted in Yarn AM submission failure. Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35524) Clear connections pools when reader exist.
Hongshun Wang created FLINK-35524: - Summary: Clear connections pools when reader exist. Key: FLINK-35524 URL: https://issues.apache.org/jira/browse/FLINK-35524 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.1 Reporter: Hongshun Wang Fix For: cdc-3.2.0 Current, inJdbcConnectionPools is static instance, so the datasource pools in it won't be recycle when reader close. It will cause memory leak. ```java public class JdbcConnectionPools implements ConnectionPools { private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class); private static JdbcConnectionPools instance; private final Map pools = new HashMap<>(); private static final Map POOL_FACTORY_MAP = new HashMap<>(); ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35523) When using the Hive connector to read a Hive table in Parquet format, a null pointer exception is thrown.
Jichao Wang created FLINK-35523: --- Summary: When using the Hive connector to read a Hive table in Parquet format, a null pointer exception is thrown. Key: FLINK-35523 URL: https://issues.apache.org/jira/browse/FLINK-35523 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.19.0, 1.16.2 Reporter: Jichao Wang When using the Hive connector to read a Hive table in Parquet format, a null pointer exception is thrown. The exception stack information is as follows: {code:text} java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342] Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:40) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] ... 6 more Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_342] at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_342] at org.apache.flink.hive.shaded.parquet.schema.GroupType.getType(GroupType.java:216) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:277) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:266) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:256) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:139) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:75) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:110) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:65) ~[flink-connector-hive_2.12-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP-24.06.1] at org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.lambda$fetch$0(HdpFileSourceSplitReader.java:38) ~[flink-connector-files-1.16.2-HDP-24.06.1.jar:1.16.2-HDP
[jira] [Created] (FLINK-35522) The source task may get stuck after a failover occurs in batch jobs
xingbe created FLINK-35522: -- Summary: The source task may get stuck after a failover occurs in batch jobs Key: FLINK-35522 URL: https://issues.apache.org/jira/browse/FLINK-35522 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.20.0 Reporter: xingbe Fix For: 1.20.0 If the source task does not get assigned a split because the SplitEnumerator has no more splits, and a failover occurs during the closing process, the SourceCoordinatorContext will not resend the NoMoreSplit event to the newly started source task, causing the source vertex to remain stuck indefinitely. This case may only occur in batch jobs where speculative execution has been enabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35521) Flink FileSystem SQL Connector Generating SUCESS File Multiple Times
EMERSON WANG created FLINK-35521: Summary: Flink FileSystem SQL Connector Generating SUCESS File Multiple Times Key: FLINK-35521 URL: https://issues.apache.org/jira/browse/FLINK-35521 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.18.1 Environment: Our PyFlink SQL jobs are running in AWS EKS environment. Reporter: EMERSON WANG Our Flink table SQL job received data from the Kafka streams and then sinked all partitioned data into the associated parquet files under the same S3 folder through the filesystem SQL connector. For the S3 filesystem SQL connector, sink.partition-commit.policy.kind was set to 'success-file' and sink.partition-commit.trigger was set to 'partition-time'. We found that _SUCCESS file in the S3 folder was generated multiple times after multiple partitions are committed. Because all partitioned parquet files and _SUCCESS file are in the same S3 folder and _SUCCESS file is used to trigger the downstream application, we really like the _SUCCESS file to be generated only once instead of multiple times after all partitions are committed and all parquet files are ready to be processed. Thus, one _SUCCESS file can be used to trigger the downstream application only once instead of multiple times. We knew we could set sink.partition-commit.trigger to 'process-time' to generate _SUCCESS file only once in the S3 folder; however, 'process-time' would not meet our business requirements. We'd request the FileSystem SQL connector should support to the following new user case: Even if sink.partition-commit.trigger is set to 'partition-time', _SUCCESS file will be generated only once after all partitions are committed and all output files are ready to be processed, and will be used to trigger the downstream application only once instead of multiple times. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35520) Nightly build can't compile as problems were detected from NoticeFileChecker
Weijie Guo created FLINK-35520: -- Summary: Nightly build can't compile as problems were detected from NoticeFileChecker Key: FLINK-35520 URL: https://issues.apache.org/jira/browse/FLINK-35520 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35519) Flink Job fails with SingleValueAggFunction received more than one element
Dawid Wysakowicz created FLINK-35519: Summary: Flink Job fails with SingleValueAggFunction received more than one element Key: FLINK-35519 URL: https://issues.apache.org/jira/browse/FLINK-35519 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: Dawid Wysakowicz When running a query: {code} select (SELECT t.id FROM raw_pagerduty_users, UNNEST(teams) AS t(id, type, summary, self, html_url)) from raw_pagerduty_users; {code} it is translated to: {code} Sink(table=[default_catalog.default_database.sink], fields=[EXPR$0]) +- Calc(select=[$f0 AS EXPR$0]) +- Join(joinType=[LeftOuterJoin], where=[true], select=[c, $f0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) :- Exchange(distribution=[single]) : +- Calc(select=[c]) : +- TableSourceScan(table=[[default_catalog, default_database, raw_pagerduty_users, project=[c, teams], metadata=[]]], fields=[c, teams])(reuse_id=[1]) +- Exchange(distribution=[single]) +- GroupAggregate(select=[SINGLE_VALUE(id) AS $f0]) +- Exchange(distribution=[single]) +- Calc(select=[id]) +- Correlate(invocation=[$UNNEST_ROWS$1($cor0.teams)], correlate=[table($UNNEST_ROWS$1($cor0.teams))], select=[c,teams,id,type,summary,self,html_url], rowType=[RecordType(BIGINT c, RecordType:peek_no_expand(VARCHAR(2147483647) id, VARCHAR(2147483647) type, VARCHAR(2147483647) summary, VARCHAR(2147483647) self, VARCHAR(2147483647) html_url) ARRAY teams, VARCHAR(2147483647) id, VARCHAR(2147483647) type, VARCHAR(2147483647) summary, VARCHAR(2147483647) self, VARCHAR(2147483647) html_url)], joinType=[INNER]) +- Reused(reference_id=[1]) {code} and it fails with: {code} java.lang.RuntimeException: SingleValueAggFunction received more than one element. at GroupAggsHandler$150.accumulate(Unknown Source) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:571) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:900) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:849) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35518) CI Bot doesn't run on PRs
Piotr Nowojski created FLINK-35518: -- Summary: CI Bot doesn't run on PRs Key: FLINK-35518 URL: https://issues.apache.org/jira/browse/FLINK-35518 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Piotr Nowojski Doesn't want to run on my PR/branch. I was doing force-pushes, rebases, asking flink bot to run, closed and opened new PR, but nothing helped https://github.com/apache/flink/pull/24868 https://github.com/apache/flink/pull/24883 I've heard others were having similar problems recently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35517) CI pipeline triggered by pull request seems unstable
Weijie Guo created FLINK-35517: -- Summary: CI pipeline triggered by pull request seems unstable Key: FLINK-35517 URL: https://issues.apache.org/jira/browse/FLINK-35517 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo Flink CI pipeline triggered by pull request seems sort of unstable. For example, https://github.com/apache/flink/pull/24883 was filed 15 hours ago, but CI report is UNKNOWN. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35516) Update the Experimental annotation to PublicEvolving for files connector
RocMarshal created FLINK-35516: -- Summary: Update the Experimental annotation to PublicEvolving for files connector Key: FLINK-35516 URL: https://issues.apache.org/jira/browse/FLINK-35516 Project: Flink Issue Type: Technical Debt Components: Connectors / FileSystem Reporter: RocMarshal as described in https://issues.apache.org/jira/browse/FLINK-35496 We should update the annotations for the stable APIs in files connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35515) Upgrade hive version to 4.0.0
vikasap created FLINK-35515: --- Summary: Upgrade hive version to 4.0.0 Key: FLINK-35515 URL: https://issues.apache.org/jira/browse/FLINK-35515 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.18.1 Reporter: vikasap Fix For: 1.18.2 Hive version 4.0.0 was released recently. However none of the major flink versions will work with this. Filing this so that major flink version's flink-sql and table api will be able to work with the new version of hive metastore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35514) Add Flink CDC Channel to Apache Flink Slack Workspace
Zhongqiang Gong created FLINK-35514: --- Summary: Add Flink CDC Channel to Apache Flink Slack Workspace Key: FLINK-35514 URL: https://issues.apache.org/jira/browse/FLINK-35514 Project: Flink Issue Type: Bug Reporter: Zhongqiang Gong DISCUSS thread : https://lists.apache.org/thread/gqzrs3c0j9k7c5m9m5k2slomgorrqrwf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35513) ArtifactFetchManager unit tests are failing
Elphas Toringepi created FLINK-35513: Summary: ArtifactFetchManager unit tests are failing Key: FLINK-35513 URL: https://issues.apache.org/jira/browse/FLINK-35513 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.19.0 Reporter: Elphas Toringepi The following unit tests in ArtifactFetchManagerTest are failing * testFileSystemFetchWithAdditionalUri() * testHttpFetch() * testMixedArtifactFetch() {code:java} Test org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testFileSystemFetchWithAdditionalUri[testFileSystemFetchWithAdditionalUri()] failed with: java.lang.AssertionError: Expecting actual not to be empty at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:251) at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testFileSystemFetchWithAdditionalUri(ArtifactFetchManagerTest.java:104) ... Test org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testMixedArtifactFetch[testMixedArtifactFetch()] failed with: java.lang.AssertionError: Expecting actual not to be empty at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:251) at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testMixedArtifactFetch(ArtifactFetchManagerTest.java:149) ... Test org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testHttpFetch[testHttpFetch()] failed with: java.lang.AssertionError: Expecting actual not to be empty at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:251) at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testHttpFetch(ArtifactFetchManagerTest.java:124) ... {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35512) ArtifactFetchManagerTest unit tests fail
Hong Liang Teoh created FLINK-35512: --- Summary: ArtifactFetchManagerTest unit tests fail Key: FLINK-35512 URL: https://issues.apache.org/jira/browse/FLINK-35512 Project: Flink Issue Type: Technical Debt Affects Versions: 1.19.1 Reporter: Hong Liang Teoh Fix For: 1.19.1 The below three tests from *ArtifactFetchManagerTest* seem to fail consistently: * ArtifactFetchManagerTest.testFileSystemFetchWithAdditionalUri * ArtifactFetchManagerTest.testMixedArtifactFetch * ArtifactFetchManagerTest.testHttpFetch The error printed is {code:java} java.lang.AssertionError: Expecting actual not to be empty at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.getFlinkClientsJar(ArtifactFetchManagerTest.java:248) at org.apache.flink.client.program.artifact.ArtifactFetchManagerTest.testMixedArtifactFetch(ArtifactFetchManagerTest.java:146) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35511) Enhance metrics. The incremental and full phases count the number of records separately
melin created FLINK-35511: - Summary: Enhance metrics. The incremental and full phases count the number of records separately Key: FLINK-35511 URL: https://issues.apache.org/jira/browse/FLINK-35511 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: melin Attachments: image-2024-06-03-22-06-38-591.png dataworks 实时同步的metric 信息 !image-2024-06-03-22-06-38-591.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35510) Implement basic incremental checkpoint for ForStStateBackend
Feifan Wang created FLINK-35510: --- Summary: Implement basic incremental checkpoint for ForStStateBackend Key: FLINK-35510 URL: https://issues.apache.org/jira/browse/FLINK-35510 Project: Flink Issue Type: New Feature Components: Runtime / State Backends Reporter: Feifan Wang Use low DB api implement a basic incremental checkpoint for ForStStatebackend, follow steps: # db.disableFileDeletions() # db.getLiveFiles(true) # db.entableFileDeletes(false) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35509) Slack community invite link has expired
Ufuk Celebi created FLINK-35509: --- Summary: Slack community invite link has expired Key: FLINK-35509 URL: https://issues.apache.org/jira/browse/FLINK-35509 Project: Flink Issue Type: Bug Components: Project Website Reporter: Ufuk Celebi The Slack invite link on the website has expired. I've generated a new invite link without expiration here: [https://join.slack.com/t/apache-flink/shared_invite/zt-2k0fdioxx-D0kTYYLh3pPjMu5IItqx3Q] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35508) Use OceanBase LTS version Docker image in testing
He Wang created FLINK-35508: --- Summary: Use OceanBase LTS version Docker image in testing Key: FLINK-35508 URL: https://issues.apache.org/jira/browse/FLINK-35508 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: He Wang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35507) Support For Individual Job Level Resource Allocation in Session Cluster in k8s
Amarjeet created FLINK-35507: Summary: Support For Individual Job Level Resource Allocation in Session Cluster in k8s Key: FLINK-35507 URL: https://issues.apache.org/jira/browse/FLINK-35507 Project: Flink Issue Type: New Feature Components: Deployment / Kubernetes Reporter: Amarjeet We can have a setup like Spark where in Spark Cluster we can set individual job level setting in a spark cluster to access the resouces from memory to core. Also Support Dynamic memory allocation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled
elon_X created FLINK-35506: -- Summary: disable kafka auto-commit and rely on flink’s checkpointing if both are enabled Key: FLINK-35506 URL: https://issues.apache.org/jira/browse/FLINK-35506 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.16.1 Reporter: elon_X When I use KafkaSource for consuming topics and set the Kafka parameter {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35505) RegionFailoverITCase.testMultiRegionFailover has never ever restored state
Weijie Guo created FLINK-35505: -- Summary: RegionFailoverITCase.testMultiRegionFailover has never ever restored state Key: FLINK-35505 URL: https://issues.apache.org/jira/browse/FLINK-35505 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35504) Improve Elasticsearch 8 connector observability
Mingliang Liu created FLINK-35504: - Summary: Improve Elasticsearch 8 connector observability Key: FLINK-35504 URL: https://issues.apache.org/jira/browse/FLINK-35504 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Affects Versions: elasticsearch-3.1.0 Reporter: Mingliang Liu Currently all logs are in DEBUG level. Some of those messages are very helpful to get the progress and errors, which can be changed to INFO or WARN level. We can also include error details into DEBUG level messages so it's easier to debug with more context. Meanwhile, we can update the metric to track {{numRecordsSend}}. FWIW, the base class tracks following metrics already so we don't need to implement them: {{CurrentSendTime}} Gauge, {{NumBytesOut}} and {{NumRecordsOut}} Counters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35503) OracleE2eITCase fails with error ORA-12528 on Mac M2
Saketh Kurnool created FLINK-35503: -- Summary: OracleE2eITCase fails with error ORA-12528 on Mac M2 Key: FLINK-35503 URL: https://issues.apache.org/jira/browse/FLINK-35503 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Environment: * Mac M2 (Apple Silicon) * using docker desktop with Rosetta enabled for amd64 emulation Reporter: Saketh Kurnool Attachments: com.ververica.cdc.connectors.tests.OracleE2eITCase.txt, oracle-docker-setup-logs.txt Hello Flink CDC community, I am attempting to run `OracleE2eITCase` (in the cdc source connector e2e tests), and I am running into the following runtime exception: {code:java} java.sql.SQLException: Listener refused the connection with the following error: ORA-12528, TNS:listener: all appropriate instances are blocking new connections at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:854) at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:228) at com.ververica.cdc.connectors.tests.OracleE2eITCase.getOracleJdbcConnection(OracleE2eITCase.java:197) at com.ververica.cdc.connectors.tests.OracleE2eITCase.testOracleCDC(OracleE2eITCase.java:149) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) Caused by: oracle.net.ns.NetException: Listener refused the connection with the following error: ORA-12528, TNS:listener: all appropriate instances are blocking new connections at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:284) at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340) at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596) at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588) ... 11 more{code} I have attached the test results to this issue. `OracleE2eITCase` runs the `goodboy008/oracle-19.3.0-ee:non-cdb` docker image. I am able to reproduce the same issue when I run this docker image locally - my observation is that dockerized Oracle DB instance is not being set up properly, as I notice another ORA in the setup logs (`ORA-03113: end-of-file on communication channel`). I have also attached the logs from the docker image setup to this issue. To reproduce the ORA-12528 issue locally, I: * ran: `docker run goodboy008/oracle-19.3.0-ee:non-cdb` * ssh'ed into the db pod * ran: `sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba` Any insight/workaround on getting this e2e test and the docker image running on my machine would be much appreciated. I'm also happy to provide any other information regarding my setup in the comments. Thank you! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35502) compress the checkpoint metadata ZK/ETCD HA Services
Ying Z created FLINK-35502: -- Summary: compress the checkpoint metadata ZK/ETCD HA Services Key: FLINK-35502 URL: https://issues.apache.org/jira/browse/FLINK-35502 Project: Flink Issue Type: Improvement Reporter: Ying Z In the implementation of Flink HA, the metadata of checkpoints is stored in either Zookeeper (ZK HA) or ETCD (K8S HA), such as: ``` checkpointID-0036044: checkpointID-0036045: ... ... ``` However, neither of these are designed to store excessive amounts of data. If the [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained) setting is set too large, it can easily cause abnormalities in ZK/ETCD. The error log when set state.checkpoints.num-retained to 1500: ``` Caused by: org.apache.flink.util.SerializedThrowable: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: Too long: must have at most 1048576 bytes. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, additionalProperties=(}), kind=Status, message=ConfigMap "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties=(}). ``` In Flink's code, all checkpoint metadata are updated at the same time, and The checkpoint metadata contains many repeated bytes, therefore it can achieve a very good compression ratio. Therefore, I suggest compressing the data when writing checkpoints and decompressing it when reading, to reduce storage pressure and improve IO efficiency. Here is the sample code, and reduce the metadata size from 1M bytes to 30K. ```java // Map -> Json ObjectMapper objectMapper = new ObjectMapper(); String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // copress and base64 String compressedBase64 = compressAndEncode(checkpointJson); compressedData.put("checkpoint-all", compressedBase64); private static String compressAndEncode(String data) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream)) { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); } byte[] compressedData = outputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedData); } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35501) Use common thread pools when transferring RocksDB state files
Roman Khachatryan created FLINK-35501: - Summary: Use common thread pools when transferring RocksDB state files Key: FLINK-35501 URL: https://issues.apache.org/jira/browse/FLINK-35501 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.20.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.20.0 Currently, each RocksDB state backend creates an executor backed by a thread pool. This makes it difficult to control the total number of threads per TM because it might have at least one task per slot and theoretically, many state backends per task (because of chaining). Additionally, using a common thread pool allows to indirectly control the load on the underlying DFS (e.g. the total number of requests to S3 from a TM). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35500) DynamoDb SinkWriter fails to delete elements due to key not found
Ahmed Hamdy created FLINK-35500: --- Summary: DynamoDb SinkWriter fails to delete elements due to key not found Key: FLINK-35500 URL: https://issues.apache.org/jira/browse/FLINK-35500 Project: Flink Issue Type: Bug Components: Connectors / DynamoDB Affects Versions: aws-connector-4.2.0, aws-connector-4.1.0, aws-connector-4.0.0 Reporter: Ahmed Hamdy Fix For: aws-connector-4.4.0 h2. Description When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} records and throws {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The provided key element does not match the schema{quote} This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key instead of the constructed primary Key[1]. Note: The issue is reported in user mailing list[2] h2. Steps to Reproduce (1) Create a new DynamoDB table in AWS. Command line: aws dynamodb create-table \ --table-name orders \ --attribute-definitions AttributeName=userId,AttributeType=S \ --key-schema AttributeName=userId,KeyType=HASH \ --billing-mode PAY_PER_REQUEST (2) Create an input file in Debezium-JSON format with the following rows to start: {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}} {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}} {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}} {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}} (3) Start the Flink SQL Client, and run the following, substituting in the proper local paths for the Dynamo Connector JAR file and for this local sample input file: ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar'; SET 'execution.runtime-mode' = 'streaming'; SET 'sql-client.execution.result-mode' = 'changelog'; CREATE TABLE Orders_CDC( orderId BIGINT, price float, userId STRING ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/input_file.jsonl', 'format' = 'debezium-json' ); CREATE TABLE Orders_Dynamo ( orderId BIGINT, price float, userId STRING, PRIMARY KEY (userId) NOT ENFORCED ) PARTITIONED BY ( userId ) WITH ( 'connector' = 'dynamodb', 'table-name' = 'orders', 'aws.region' = 'us-east-1' ); INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; (4) At this point, we will see that things currently all work properly, and these 4 rows are inserted properly to Dynamo, because they are "Insert" operations. So far, so good! (5) Now, add the following row to the input file. This represents a deletion in Debezium format, which should then cause a Deletion on the corresponding DynamoDB table: {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}} (6) Re-Run the SQL statement: INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; h3. References 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35499) EventTimeWindowCheckpointingITCase times out due to Checkpoint expired before completing
Ryan Skraba created FLINK-35499: --- Summary: EventTimeWindowCheckpointingITCase times out due to Checkpoint expired before completing Key: FLINK-35499 URL: https://issues.apache.org/jira/browse/FLINK-35499 Project: Flink Issue Type: Bug Affects Versions: 1.20.0 Reporter: Ryan Skraba * 1.20 AdaptiveScheduler / Test (module: tests) https://github.com/apache/flink/actions/runs/9311892945/job/25632037990#step:10:8702 * 1.20 Default (Java 8) / Test (module: tests) https://github.com/apache/flink/actions/runs/9275522134/job/25520829730#step:10:8264 Going into the logs, we see the following error occurs: {code:java} Test testTumblingTimeWindow[statebackend type =ROCKSDB_INCREMENTAL, buffersPerChannel = 2](org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase) is running. <...> 20:24:23,562 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 22 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1716927863562 for job 15d0a663cb415b09b9a68ccc40640c6d. 20:24:23,609 [jobmanager-io-thread-2] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed checkpoint 22 for job 15d0a663cb415b09b9a68ccc40640c6d (2349132 bytes, checkpointDuration=43 ms, finalizationTime=4 ms). 20:24:23,610 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 23 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1716927863610 for job 15d0a663cb415b09b9a68ccc40640c6d. 20:24:23,620 [jobmanager-io-thread-2] WARN org.apache.flink.runtime.jobmaster.JobMaster [] - Error while processing AcknowledgeCheckpoint message java.lang.IllegalStateException: Attempt to reference unknown state: a9a90973-4ee5-384f-acef-58a7c7560920 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.SharedStateRegistryImpl.registerReference(SharedStateRegistryImpl.java:97) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.SharedStateRegistry.registerReference(SharedStateRegistry.java:53) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.registerSharedStates(IncrementalRemoteKeyedStateHandle.java:289) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.registerSharedState(OperatorSubtaskState.java:243) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.registerSharedStates(OperatorSubtaskState.java:226) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.TaskStateSnapshot.registerSharedStates(TaskStateSnapshot.java:193) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1245) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139) ~[flink-runtime-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64) ~[flink-core-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_392] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_392] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392] 20:24:23,663 [Source: Custom Source (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task[] - Source: Custom Source (1/1)#1 (bc4de0d149fba0ca825771ff7eeae08d_bc764cd8ddf7a0cff126f51c16239658_0_1) switched from RUNNING to FINISHED. 20:24:23,663 [Source: Custom Source (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task[] - Freeing task resources for Source: Custom Source (1/1)#1 (bc4de0d149fba0ca825771ff7eeae08d_bc764cd8ddf7a0cff126f51c16239658_0_1). 20:24:23,663 [flink-pekko.actor.default-dispatcher-8] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager fo
[jira] [Created] (FLINK-35498) Unexpected argument name conflict error when do extract method params from udf
lincoln lee created FLINK-35498: --- Summary: Unexpected argument name conflict error when do extract method params from udf Key: FLINK-35498 URL: https://issues.apache.org/jira/browse/FLINK-35498 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0, 1.20.0 Reporter: lincoln lee Assignee: xuyang Follow the steps to reproduce the error: test case: {code} util.addTemporarySystemFunction("myudf", new TestXyz) util.tableEnv.explainSql("select myudf(f1, f2) from t") {code} udf: TestXyz {code} public class TestXyz extends ScalarFunction { public String eval(String s1, String s2) { String localV1; if (s1 == null) { if (s2 != null) { localV1 = s2; } else { localV1 = s2 + s1; } } else { if ("xx".equals(s2)) { localV1 = s1.length() >= s2.length() ? s1 : s2; } else { localV1 = s1; } } if (s1 == null) { return s2 + localV1; } if (s2 == null) { return s1; } return s1.length() >= s2.length() ? s1 + localV1 : s2; } } {code} error stack: {code} Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method: public java.lang.String org.apache.flink.table.planner.runtime.utils.TestXyz.eval(java.lang.String,java.lang.String) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) at org.apache.flink.table.types.extraction.BaseMappingExtractor.extractResultMappings(BaseMappingExtractor.java:154) at org.apache.flink.table.types.extraction.BaseMappingExtractor.extractOutputMapping(BaseMappingExtractor.java:100) ... 53 more Caused by: org.apache.flink.table.api.ValidationException: Argument name conflict, there are at least two argument names that are the same. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357) at org.apache.flink.table.types.extraction.FunctionSignatureTemplate.of(FunctionSignatureTemplate.java:73) at org.apache.flink.table.types.extraction.BaseMappingExtractor.lambda$createParameterSignatureExtraction$9(BaseMappingExtractor.java:381) at org.apache.flink.table.types.extraction.BaseMappingExtractor.putExtractedResultMappings(BaseMappingExtractor.java:298) at org.apache.flink.table.types.extraction.BaseMappingExtractor.collectMethodMappings(BaseMappingExtractor.java:244) at org.apache.flink.table.types.extraction.BaseMappingExtractor.extractResultMappings(BaseMappingExtractor.java:137) ... 54 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35497) The wrong enum value was used to get month in timestampDiff
haishui created FLINK-35497: --- Summary: The wrong enum value was used to get month in timestampDiff Key: FLINK-35497 URL: https://issues.apache.org/jira/browse/FLINK-35497 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: haishui In [SystemFunctionUtils#timestampDiff](https://github.com/apache/flink-cdc/blob/master/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java#L125): {code:java} case "MONTH": return to.get(Calendar.YEAR) * 12 + to.get(Calendar.MONDAY) - (from.get(Calendar.YEAR) * 12 + from.get(Calendar.MONDAY)); {code} The Calendar.MONDAY can be replaced with Calendar.MONTH. This does not affect the calculation results, because Calendar.MONDAY = Calendar.MONTH = 2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35496) The annotations of the new JDBC connector should be changed to non-Public/non-PublicEvolving
RocMarshal created FLINK-35496: -- Summary: The annotations of the new JDBC connector should be changed to non-Public/non-PublicEvolving Key: FLINK-35496 URL: https://issues.apache.org/jira/browse/FLINK-35496 Project: Flink Issue Type: Technical Debt Components: Connectors / JDBC Reporter: RocMarshal In general, we use the Experimental annotation instead of {{PublicEvolving}} or {{Public}} for new features or new apis. But {{JdbcSink}} and JdbcSource(merged ) was marked as {{PublicEvolving}} in the first version. [~fanrui] commented it to the original PR[1].[1] [https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1621857589] CC [~eskabetxe] [~Sergey Nuyanzin] [~fanrui] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35495) The native metrics for column family are not reported
Yanfei Lei created FLINK-35495: -- Summary: The native metrics for column family are not reported Key: FLINK-35495 URL: https://issues.apache.org/jira/browse/FLINK-35495 Project: Flink Issue Type: Sub-task Reporter: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35494) Reorganize sources
João Boto created FLINK-35494: - Summary: Reorganize sources Key: FLINK-35494 URL: https://issues.apache.org/jira/browse/FLINK-35494 Project: Flink Issue Type: Sub-task Reporter: João Boto -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35493) Make max history age and count configurable for FlinkStateSnapshot resources
Mate Czagany created FLINK-35493: Summary: Make max history age and count configurable for FlinkStateSnapshot resources Key: FLINK-35493 URL: https://issues.apache.org/jira/browse/FLINK-35493 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35492) Add metrics for FlinkStateSnapshot resources
Mate Czagany created FLINK-35492: Summary: Add metrics for FlinkStateSnapshot resources Key: FLINK-35492 URL: https://issues.apache.org/jira/browse/FLINK-35492 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35491) [JUnit5 Migration] Module: Flink CDC modules
Muhammet Orazov created FLINK-35491: --- Summary: [JUnit5 Migration] Module: Flink CDC modules Key: FLINK-35491 URL: https://issues.apache.org/jira/browse/FLINK-35491 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Muhammet Orazov Migrate Junit4 tests to Junit5 for the following modules: * flink-cdc-common * flink-cdc-composer * flink-cdc-runtime * flink-cdc-connect/flink-cdc-pipeline-connectors * flink-cdc-e2e-tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35490) [JUnit5 Migration] Module: Flink CDC flink-cdc-connect/flink-cdc-source-connectors
Muhammet Orazov created FLINK-35490: --- Summary: [JUnit5 Migration] Module: Flink CDC flink-cdc-connect/flink-cdc-source-connectors Key: FLINK-35490 URL: https://issues.apache.org/jira/browse/FLINK-35490 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: Muhammet Orazov Migrate Junit4 tests to Junit5 in the Flink CDC following modules: - flink-cdc-connect/flink-cdc-source-connectors -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35489) Add capability to set min taskmanager.memory.managed.size when enabling autotuning
Nicolas Fraison created FLINK-35489: --- Summary: Add capability to set min taskmanager.memory.managed.size when enabling autotuning Key: FLINK-35489 URL: https://issues.apache.org/jira/browse/FLINK-35489 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: 1.8.0 Reporter: Nicolas Fraison We have enable the autotuning feature on one of our flink job with below config {code:java} # Autoscaler configuration job.autoscaler.enabled: "true" job.autoscaler.stabilization.interval: 1m job.autoscaler.metrics.window: 10m job.autoscaler.target.utilization: "0.8" job.autoscaler.target.utilization.boundary: "0.1" job.autoscaler.restart.time: 2m job.autoscaler.catch-up.duration: 10m job.autoscaler.memory.tuning.enabled: true job.autoscaler.memory.tuning.overhead: 0.5 job.autoscaler.memory.tuning.maximize-managed-memory: true{code} During a scale down the autotuning decided to give all the memory to to JVM (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. Here is the config that was compute by the autotuning for a TM running on a 4GB pod: {code:java} taskmanager.memory.network.max: 4063232b taskmanager.memory.network.min: 4063232b taskmanager.memory.jvm-overhead.max: 433791712b taskmanager.memory.task.heap.size: 3699934605b taskmanager.memory.framework.off-heap.size: 134217728b taskmanager.memory.jvm-metaspace.size: 22960020b taskmanager.memory.framework.heap.size: "0 bytes" taskmanager.memory.flink.size: 3838215565b taskmanager.memory.managed.size: 0b {code} This has lead to some issue starting the TM because we are relying on some javaagent performing some memory allocation outside of the JVM (rely on some C bindings). Tuning the overhead or disabling the scale-down-compensation.enabled could have helped for that particular event but this can leads to other issue as it could leads to too little HEAP size being computed. It would be interesting to be able to set a min memory.managed.size to be taken in account by the autotuning. What do you think about this? Do you think that some other specific config should have been applied to avoid this issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35488) DataType Support Geometry Type
Leopold created FLINK-35488: --- Summary: DataType Support Geometry Type Key: FLINK-35488 URL: https://issues.apache.org/jira/browse/FLINK-35488 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: Leopold I want sync data from mysql to postgresql,but in Geometry Datatype filed i couldn't do it,mysql geometry datatype data can be transformed to string by spatialfuntion ,for example.st_astext(geom) .In other way,postgresql geometry datatype data also transformed to string . So,i hope Flink suport mysql and postgrdql databse geometry datatype can be transform. Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35487) ContinuousFileProcessingCheckpointITCase crashed as process exit with code 127
Weijie Guo created FLINK-35487: -- Summary: ContinuousFileProcessingCheckpointITCase crashed as process exit with code 127 Key: FLINK-35487 URL: https://issues.apache.org/jira/browse/FLINK-35487 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35486) Potential sql expression generation issues on SQL gateway
Xingcan Cui created FLINK-35486: --- Summary: Potential sql expression generation issues on SQL gateway Key: FLINK-35486 URL: https://issues.apache.org/jira/browse/FLINK-35486 Project: Flink Issue Type: Bug Components: Table SQL / Gateway, Table SQL / Planner Affects Versions: 1.18.1 Reporter: Xingcan Cui We hit the following exceptions a few times when submitting queries to a session cluster with the Flink SQL gateway. When the same queries were submitted again, everything was good. There might be a concurrency problem for the expression generator. {code:java} "process.thread.name":"sql-gateway-operation-pool-thread-111","log.logger":"org.apache.flink.table.gateway.service.operation.OperationManager","error.type":"org.apache.flink.table.planner.codegen.CodeGenException","error.message":"Mismatch of expected output data type 'ARRAY NOT NULL>' and function's output type 'ARRAY NOT NULL>'.","error.stack_trace":"org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of expected output data type 'ARRAY NOT NULL>' and function's output type 'ARRAY NOT NULL>'. at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyOutputType(BridgingFunctionGenUtil.scala:369) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyFunctionAwareOutputType(BridgingFunctionGenUtil.scala:359) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:107) at org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:84) at org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:79) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:820) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:481) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:189) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:478) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:469) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:189) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:478) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:469) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:189) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:134) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:140) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceP
[jira] [Created] (FLINK-35485) JobMaster failed with "the job xx has not been finished"
Xingcan Cui created FLINK-35485: --- Summary: JobMaster failed with "the job xx has not been finished" Key: FLINK-35485 URL: https://issues.apache.org/jira/browse/FLINK-35485 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.1 Reporter: Xingcan Cui We ran a session cluster on K8s and used Flink SQL gateway to submit queries. Hit the following rare exception once which caused the job manager to restart. {code:java} org.apache.flink.util.FlinkException: JobMaster for job 50d681ae1e8170f77b4341dda6aba9bc failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1454) at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:776) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$6(Dispatcher.java:698) at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) Caused by: org.apache.flink.runtime.jobmaster.JobNotFinishedException: The job (50d681ae1e8170f77b4341dda6aba9bc) has not been finished. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.closeAsync(DefaultJobMasterServiceProcess.java:157) at org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.stopJobMasterServiceProcess(JobMasterServiceLeadershipRunner.java:431) at org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.callIfRunning(JobMasterServiceLeadershipRunner.java:476) at org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.lambda$stopJobMasterServiceProcessAsync$12(JobMasterServiceLeadershipRunner.java:407) at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.thenCompose(Unknown Source) at org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.stopJobMasterServiceProcessAsync(JobMasterServiceLeadershipRunner.java:405) at org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.runIfStateRunning(JobMasterServiceLeadershipRunner.java:463) at org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.revokeLeadership(JobMasterServiceLeadershipRunner.java:397) at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.notifyLeaderContenderOfLeadershipLoss(DefaultLeaderElectionService.java:484) at java.base/java.util.HashMap.forEach(Unknown Source) at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onRevokeLeadershipInternal(DefaultLeaderElectionServic