[jira] [Created] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-15152:
---

 Summary: Job running without periodic checkpoint for stop failed 
at the beginning
 Key: FLINK-15152
 URL: https://issues.apache.org/jira/browse/FLINK-15152
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.9.1
Reporter: Feng Jiajie


I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:
 # Job was submitted to YARN:

{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}

 # Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}

Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:
 # "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:

{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}

 # but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509

{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}

 # finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-33171) Table SQL support Not Equal for TimePoint type and TimeString

2023-09-30 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33171:
---

 Summary: Table SQL support Not Equal for TimePoint type and 
TimeString
 Key: FLINK-33171
 URL: https://issues.apache.org/jira/browse/FLINK-33171
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.18.0
Reporter: Feng Jiajie
 Fix For: 1.17.2, 1.18.1


When executing the following SQL:
{code:sql}
SELECT
time1,
time1 = '2023-09-30 18:22:42.123' AS eq1,
NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1
FROM table1;
{code}
the result is as follows:
{code:java}
++-+++
| op |   time1 |eq1 | notEq1 |
++-+++
| +I | 2023-09-30 18:22:42.123 |   TRUE |   TRUE |
| +I | 2023-09-30 18:22:42.124 |  FALSE |   TRUE |
++-+++
2 rows in set
{code}
The "notEq1" in the first row should be FALSE.

Here is the reproducing code:
{code:java}
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TimePointNotEqualTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
env.setParallelism(1);

DataStreamSource longDataStreamSource = env.fromSequence(0, 1);
RowTypeInfo rowTypeInfo =
new RowTypeInfo(new TypeInformation[] {Types.LONG}, new 
String[] {"time1"});
SingleOutputStreamOperator map =
longDataStreamSource.map(new RichMapFunction() {
@Override
public Row map(Long value) {
Row row = new Row(1);
row.setField(0, 1696069362123L + value);
return row;
}
}, rowTypeInfo);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Schema schema = Schema.newBuilder()
.column("time1", 
DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
.build();
tableEnv.createTemporaryView("table1", map, schema);

tableEnv.sqlQuery("SELECT "
+ "time1," // 2023-09-30 18:22:42.123
+ "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE
+ "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // 
expect FALSE but TRUE
+ "FROM table1").execute().print();
}
}
{code}
I would like to attempt to fix this issue. If possible, please assign the issue 
to me. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33360:
---

 Summary: HybridSource fails to clear the previous round's state 
when switching sources, leading to data loss
 Key: FLINK-33360
 URL: https://issues.apache.org/jira/browse/FLINK-33360
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HybridSource
Affects Versions: 1.17.1, 1.16.2
Reporter: Feng Jiajie
 Fix For: 1.7.3


org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
{code:java}
            // track readers that have finished processing for current 
enumerator
            finishedReaders.add(subtaskId);
            if (finishedReaders.size() == context.currentParallelism()) {
                LOG.debug("All readers finished, ready to switch enumerator!");
                if (currentSourceIndex + 1 < sources.size()) {
                    switchEnumerator();
                    // switch all readers prior to sending split assignments
                    for (int i = 0; i < context.currentParallelism(); i++) {
                        sendSwitchSourceEvent(i, currentSourceIndex);
                    }
                }
            } {code}
I think that *finishedReaders* is used to keep track of all the subTaskIds that 
have finished reading the current round of the source. Therefore, in the 
*switchEnumerator* function, *finishedReaders* should be cleared:

If it's not cleared, then in the next source reading, whenever any SourceReader 
reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have 
finished processing in parallel), the condition *finishedReaders.size() == 
context.currentParallelism()* will be satisfied and it will trigger 
{*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a 
*SwitchSourceEvent* to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
previous source, it will execute {*}currentReader.close(){*}, and some data may 
not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode

2024-01-03 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33981:
---

 Summary: File Descriptor References Not Released After Job 
Execution in MiniCluster Mode
 Key: FLINK-33981
 URL: https://issues.apache.org/jira/browse/FLINK-33981
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.18.0
Reporter: Feng Jiajie


When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java18162 sa_cluster   30r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   31r   DIR  253,1 01311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java18162 sa_cluster   32r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   33r   DIR  253,1 01310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java18162 sa_cluster   34r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   35r   DIR  253,1 01311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java18162 sa_cluster   36r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   37r   DIR  253,1 01311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java18162 sa_cluster   38r   DIR  253,1 01311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:

 
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; ++i) {
  int round = i;
  Thread thread = new Thread(() -> {
try {
  Configuration configuration = new Configuration();
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  env.setParallelism(1);

  DataStreamSource longDataStreamSource = env.fromSequence(1, 
10);
  longDataStreamSource.addSink(new DiscardingSink<>());

  StreamGraph streamGraph = env.getStreamGraph();
  streamGraph.setJobName("test-" + System.nanoTime());
  JobClient jobClient = env.executeAsync(streamGraph);

  CompletableFuture 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
  JobExecutionResult jobExecutionResult = null;
  while (jobExecutionResult == null) {
try {
  jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
  // ignore
}
  }
  System.out.println("finished round: " + round);
  env.close();
} catch (Exception e) {
  throw new RuntimeException(e);
}
  });

  thread.setDaemon(true);
  thread.start();
  thread.join();

  System.out.println("done ... " + i);
}

// === lsof -p 18162
Thread.sleep(500_000_000);
  }
}
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35737) Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown

2024-07-01 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-35737:
---

 Summary: Prevent Memory Leak by Closing 
MemoryExecutionGraphInfoStore on MiniCluster Shutdown
 Key: FLINK-35737
 URL: https://issues.apache.org/jira/browse/FLINK-35737
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.18.1
Reporter: Feng Jiajie
 Fix For: 1.18.2, 1.20.0, 1.19.2


MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and 
deregisters it within its close() method.
{code:java}
public MemoryExecutionGraphInfoStore(...) {
...
this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, 
getClass().getSimpleName(), LOG);
}

@Override
public void close() throws IOException {
...
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
}{code}
Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object but 
doesn't retain a reference to it, nor does it call close() during its own 
shutdown process.
{code:java}
        final DispatcherResourceManagerComponent 
dispatcherResourceManagerComponent =
                dispatcherResourceManagerComponentFactory.create(
...
                        new MemoryExecutionGraphInfoStore(),  // -> new
...); {code}
This behavior leads to an accumulation of ShutdownHooks when running multiple 
Flink jobs within the same local JVM. These accumulating hooks, along with 
their associated references, contribute to a memory leak.

This patch addresses the issue by ensuring that MemoryExecutionGraphInfoStore's 
close() method is invoked during MiniCluster shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-17 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-15308:
---

 Summary: Job failed when enable pipelined-shuffle.compression and 
numberOfTaskSlots > 1
 Key: FLINK-15308
 URL: https://issues.apache.org/jira/browse/FLINK-15308
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.10.0
 Environment: $ git log
commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
Author: bowen.li 
Date: Tue Dec 17 17:37:03 2019 -0800
Reporter: Feng Jiajie


Job worked well with default flink-conf.yaml with pipelined-shuffle.compression:
{code:java}
taskmanager.numberOfTaskSlots: 1
taskmanager.network.pipelined-shuffle.compression.enabled: true
{code}
But when I set taskmanager.numberOfTaskSlots to 4 or 6:
{code:java}
taskmanager.numberOfTaskSlots: 6
taskmanager.network.pipelined-shuffle.compression.enabled: true
{code}
job failed:
{code:java}
$ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
~/flink-example-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- The configuration directory 
('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
 already contains a LOG4J config file.If you want to use logback, then please 
delete or rename the log configuration file.
2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- The configuration directory 
('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
 already contains a LOG4J config file.If you want to use logback, then please 
delete or rename the log configuration file.
2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Cluster specification: ClusterSpecification{masterMemoryMB=1024, 
taskManagerMemoryMB=12288, numberTaskManagers=1, slotsPerTaskManager=6}
2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Submitting application master application_1576573857638_0026
2019-12-18 15:04:42,370 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1576573857638_0026
2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Waiting for the cluster to be allocated
2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deploying cluster, current state ACCEPTED
2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- YARN application has been deployed successfully.
2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Found Web Interface debugboxcreate431x3.sa:36162 of application 
'application_1576573857638_0026'.
Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.run