[jira] [Created] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
Feng Jiajie created FLINK-15152: --- Summary: Job running without periodic checkpoint for stop failed at the beginning Key: FLINK-15152 URL: https://issues.apache.org/jira/browse/FLINK-15152 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.9.1 Reporter: Feng Jiajie I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: # Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} # Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a stop with savepoint command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: # "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} # but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} # finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction>() { @Override public Tuple2 map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction, Tuple2> { private transient ValueState> sum; @Override public void flatMap(Tuple2 input, Collector> out) throws Exception { Tuple2 currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-33171) Table SQL support Not Equal for TimePoint type and TimeString
Feng Jiajie created FLINK-33171: --- Summary: Table SQL support Not Equal for TimePoint type and TimeString Key: FLINK-33171 URL: https://issues.apache.org/jira/browse/FLINK-33171 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.1, 1.18.0 Reporter: Feng Jiajie Fix For: 1.17.2, 1.18.1 When executing the following SQL: {code:sql} SELECT time1, time1 = '2023-09-30 18:22:42.123' AS eq1, NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 FROM table1; {code} the result is as follows: {code:java} ++-+++ | op | time1 |eq1 | notEq1 | ++-+++ | +I | 2023-09-30 18:22:42.123 | TRUE | TRUE | | +I | 2023-09-30 18:22:42.124 | FALSE | TRUE | ++-+++ 2 rows in set {code} The "notEq1" in the first row should be FALSE. Here is the reproducing code: {code:java} import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class TimePointNotEqualTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); env.setParallelism(1); DataStreamSource longDataStreamSource = env.fromSequence(0, 1); RowTypeInfo rowTypeInfo = new RowTypeInfo(new TypeInformation[] {Types.LONG}, new String[] {"time1"}); SingleOutputStreamOperator map = longDataStreamSource.map(new RichMapFunction() { @Override public Row map(Long value) { Row row = new Row(1); row.setField(0, 1696069362123L + value); return row; } }, rowTypeInfo); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Schema schema = Schema.newBuilder() .column("time1", DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) .build(); tableEnv.createTemporaryView("table1", map, schema); tableEnv.sqlQuery("SELECT " + "time1," // 2023-09-30 18:22:42.123 + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // expect FALSE but TRUE + "FROM table1").execute().print(); } } {code} I would like to attempt to fix this issue. If possible, please assign the issue to me. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss
Feng Jiajie created FLINK-33360: --- Summary: HybridSource fails to clear the previous round's state when switching sources, leading to data loss Key: FLINK-33360 URL: https://issues.apache.org/jira/browse/FLINK-33360 Project: Flink Issue Type: Bug Components: Connectors / HybridSource Affects Versions: 1.17.1, 1.16.2 Reporter: Feng Jiajie Fix For: 1.7.3 org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: {code:java} // track readers that have finished processing for current enumerator finishedReaders.add(subtaskId); if (finishedReaders.size() == context.currentParallelism()) { LOG.debug("All readers finished, ready to switch enumerator!"); if (currentSourceIndex + 1 < sources.size()) { switchEnumerator(); // switch all readers prior to sending split assignments for (int i = 0; i < context.currentParallelism(); i++) { sendSwitchSourceEvent(i, currentSourceIndex); } } } {code} I think that *finishedReaders* is used to keep track of all the subTaskIds that have finished reading the current round of the source. Therefore, in the *switchEnumerator* function, *finishedReaders* should be cleared: If it's not cleared, then in the next source reading, whenever any SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have finished processing in parallel), the condition *finishedReaders.size() == context.currentParallelism()* will be satisfied and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a *SwitchSourceEvent* to all SourceReaders. If a SourceReader receives a SwitchSourceEvent before it finishes reading the previous source, it will execute {*}currentReader.close(){*}, and some data may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
Feng Jiajie created FLINK-33981: --- Summary: File Descriptor References Not Released After Job Execution in MiniCluster Mode Key: FLINK-33981 URL: https://issues.apache.org/jira/browse/FLINK-33981 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.18.0 Reporter: Feng Jiajie When using MiniCluster mode, file descriptors like {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals: {code:java} ... java18162 sa_cluster 30r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 31r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 32r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 33r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 34r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 35r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 36r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 37r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 38r DIR 253,1 01311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) ... {code} The code used for reproduction is as follows: {code:java} import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * javac -cp 'lib/*' TestReleaseFd.java * java -Xmx600m -cp '.:lib/*' TestReleaseFd */ public class TestReleaseFd { public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { int round = i; Thread thread = new Thread(() -> { try { Configuration configuration = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); DataStreamSource longDataStreamSource = env.fromSequence(1, 10); longDataStreamSource.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setJobName("test-" + System.nanoTime()); JobClient jobClient = env.executeAsync(streamGraph); CompletableFuture jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); JobExecutionResult jobExecutionResult = null; while (jobExecutionResult == null) { try { jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // ignore } } System.out.println("finished round: " + round); env.close(); } catch (Exception e) { throw new RuntimeException(e); } }); thread.setDaemon(true); thread.start(); thread.join(); System.out.println("done ... " + i); } // === lsof -p 18162 Thread.sleep(500_000_000); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35737) Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown
Feng Jiajie created FLINK-35737: --- Summary: Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown Key: FLINK-35737 URL: https://issues.apache.org/jira/browse/FLINK-35737 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.18.1 Reporter: Feng Jiajie Fix For: 1.18.2, 1.20.0, 1.19.2 MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and deregisters it within its close() method. {code:java} public MemoryExecutionGraphInfoStore(...) { ... this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); } @Override public void close() throws IOException { ... // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); }{code} Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object but doesn't retain a reference to it, nor does it call close() during its own shutdown process. {code:java} final DispatcherResourceManagerComponent dispatcherResourceManagerComponent = dispatcherResourceManagerComponentFactory.create( ... new MemoryExecutionGraphInfoStore(), // -> new ...); {code} This behavior leads to an accumulation of ShutdownHooks when running multiple Flink jobs within the same local JVM. These accumulating hooks, along with their associated references, contribute to a memory leak. This patch addresses the issue by ensuring that MemoryExecutionGraphInfoStore's close() method is invoked during MiniCluster shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
Feng Jiajie created FLINK-15308: --- Summary: Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 Key: FLINK-15308 URL: https://issues.apache.org/jira/browse/FLINK-15308 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.10.0 Environment: $ git log commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae Author: bowen.li Date: Tue Dec 17 17:37:03 2019 -0800 Reporter: Feng Jiajie Job worked well with default flink-conf.yaml with pipelined-shuffle.compression: {code:java} taskmanager.numberOfTaskSlots: 1 taskmanager.network.pipelined-shuffle.compression.enabled: true {code} But when I set taskmanager.numberOfTaskSlots to 4 or 6: {code:java} taskmanager.numberOfTaskSlots: 6 taskmanager.network.pipelined-shuffle.compression.enabled: true {code} job failed: {code:java} $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m ~/flink-example-1.0-SNAPSHOT.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, numberTaskManagers=1, slotsPerTaskManager=6} 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1576573857638_0026 2019-12-18 15:04:42,370 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1576573857638_0026 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor - Waiting for the cluster to be allocated 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor - YARN application has been deployed successfully. 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor - Found Web Interface debugboxcreate431x3.sa:36162 of application 'application_1576573857638_0026'. Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9140c70769f4271cc22ea8becaa26272) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924) at org.apache.flink.runtime.security.HadoopSecurityContext.run