[jira] [Created] (FLINK-32809) YarnClusterDescriptor.isarchiveonlyincludedinShipArchiveFiles dose not work as expected

2023-08-08 Thread junzhong qin (Jira)
junzhong qin created FLINK-32809:


 Summary: 
YarnClusterDescriptor.isarchiveonlyincludedinShipArchiveFiles dose not work as 
expected
 Key: FLINK-32809
 URL: https://issues.apache.org/jira/browse/FLINK-32809
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.17.1, 1.16.2, 1.18.0
Reporter: junzhong qin


YarnClusterDescriptor.isarchiveonlyincludedinShipArchiveFiles(List 
shipFiles) check wether the shipFiles are all archive files, but it dose not 
work as expected.

 
{code:java}
public static boolean isArchiveOnlyIncludedInShipArchiveFiles(List 
shipFiles) {
return shipFiles.stream()
.filter(File::isFile)
.map(File::getName)
.map(String::toLowerCase)
.allMatch(
name ->
name.endsWith(".tar.gz")
|| name.endsWith(".tar")
|| name.endsWith(".tgz")
|| name.endsWith(".dst")
|| name.endsWith(".jar")
|| name.endsWith(".zip"));
} {code}
 

 

When we pass a directory and an archive file it should return false but it 
returns true. 

 
{code:java}
// dir1 is a directory and archive.zip is an archive file
List files = Arrays.asList(new File("/tmp/dir1"), new 
File("/tmp/archive.zip")); 
boolean result = isArchiveOnlyIncludedInShipArchiveFiles(files);
System.out.println(result); // Print true but is should print false{code}
If flink user want to add directory as ship file, they should use 
configuration: yarn.ship-files .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31749) The Using Hadoop OutputFormats example is not avaliable for DataStream

2023-04-06 Thread junzhong qin (Jira)
junzhong qin created FLINK-31749:


 Summary: The Using Hadoop OutputFormats example is not avaliable 
for DataStream
 Key: FLINK-31749
 URL: https://issues.apache.org/jira/browse/FLINK-31749
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.4, 1.17.0
Reporter: junzhong qin


The following example shows how to use Hadoop’s {{TextOutputFormat from the 
doc: 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/formats/hadoop/#using-hadoop-outputformats
 . But the DataStream has no outpu()).}}
{code:java}
// Obtain the result we want to emit
DataStream> hadoopResult = [...]

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat hadoopOF =
  // create the Flink wrapper.
  new HadoopOutputFormat(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", 
" ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33679) RestoreMode uses NO_CLAIM as default instead of LEGACY

2023-11-28 Thread junzhong qin (Jira)
junzhong qin created FLINK-33679:


 Summary: RestoreMode uses NO_CLAIM as default instead of LEGACY
 Key: FLINK-33679
 URL: https://issues.apache.org/jira/browse/FLINK-33679
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / State Backends
Reporter: junzhong qin


RestoreMode uses NO_CLAIM as default instead of LEGACY.
{code:java}
public enum RestoreMode implements DescribedEnum {
CLAIM(
"Flink will take ownership of the given snapshot. It will clean the"
+ " snapshot once it is subsumed by newer ones."),
NO_CLAIM(
"Flink will not claim ownership of the snapshot files. However it 
will make sure it"
+ " does not depend on any artefacts from the restored 
snapshot. In order to do that,"
+ " Flink will take the first checkpoint as a full one, 
which means it might"
+ " reupload/duplicate files that are part of the restored 
checkpoint."),
LEGACY(
"This is the mode in which Flink worked so far. It will not claim 
ownership of the"
+ " snapshot and will not delete the files. However, it can 
directly depend on"
+ " the existence of the files of the restored checkpoint. 
It might not be safe"
+ " to delete checkpoints that were restored in legacy mode 
");

private final String description;

RestoreMode(String description) {
this.description = description;
}

@Override
@Internal
public InlineElement getDescription() {
return text(description);
}

public static final RestoreMode DEFAULT = NO_CLAIM;
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34261) When slotmanager.redundant-taskmanager-num is set to a value greater than 1, redundant task managers may be repeatedly released and requested

2024-01-29 Thread junzhong qin (Jira)
junzhong qin created FLINK-34261:


 Summary: When slotmanager.redundant-taskmanager-num is set to a 
value greater than 1, redundant task managers may be repeatedly released and 
requested
 Key: FLINK-34261
 URL: https://issues.apache.org/jira/browse/FLINK-34261
 Project: Flink
  Issue Type: Bug
Reporter: junzhong qin
 Attachments: image-2024-01-29-17-29-15-453.png

Redundant task managers are extra task managers started by Flink, to speed up 
job recovery in case of failures due to task manager lost. But when we 
configured 
{code:java}
slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code}
Flink will release and request redundant TM repeatedly.

We can reproduce this situation by using [Flink Kubernetes Operator (using 
minikube 
here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/], 
here is an example yaml file:

 
{code:java}
// redundant-tm.yaml

#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
apiVersion:
 flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: redundant-tm
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    slotmanager.redundant-taskmanager-num: "2"
    cluster.fine-grained-resource-management.enabled: "false"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 3
    upgradeMode: stateless
  logConfiguration:
    log4j-console.properties: |+
      rootLogger.level = DEBUG
      rootLogger.appenderRef.file.ref = LogFile
      rootLogger.appenderRef.console.ref = LogConsole
      appender.file.name = LogFile
      appender.file.type = File
      appender.file.append = false
      appender.file.fileName = ${sys:log.file}
      appender.file.layout.type = PatternLayout
      appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n{code}
After executing:
{code:java}
kubectl create -f redundant-tm.yaml
kubectl port-forward svc/redundant-tm 8081{code}
We can find repeatedly release and request redundant TM in JM's log:
{code:java}
2024-01-29 09:26:25,033 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4 
(pekko.tcp://flink@10.244.1.196:6122/user/rpc/taskmanager_0) at 
ResourceManager2024-01-29 09:26:25,060 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Registering task executor redundant-tm-taskmanager-1-4 under 
44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.2024-01-29 09:26:25,061 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- Worker redundant-tm-taskmanager-1-4 is registered.2024-01-29 09:26:25,061 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2} was requested in current attempt. Current pending count 
after registering: 1.2024-01-29 09:26:25,196 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].2024-01-29 09:26:25,196 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- need release 1 workers, current worker number 4, declared worker number 
32024-01-29 09:26:25,199 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResour

[jira] [Created] (FLINK-34525) When redundant task managers are enabled, it will still take a considerable amount of time to restart a job if a task manager is killed.

2024-02-26 Thread junzhong qin (Jira)
junzhong qin created FLINK-34525:


 Summary: When redundant task managers are enabled, it will still 
take a considerable amount of time to restart a job if a task manager is killed.
 Key: FLINK-34525
 URL: https://issues.apache.org/jira/browse/FLINK-34525
 Project: Flink
  Issue Type: Improvement
Reporter: junzhong qin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34526) Without shuffling data, when a Task Manager is killed, restarting the Flink job takes a considerable amount of time

2024-02-26 Thread junzhong qin (Jira)
junzhong qin created FLINK-34526:


 Summary: Without shuffling data, when a Task Manager is killed, 
restarting the Flink job takes a considerable amount of time
 Key: FLINK-34526
 URL: https://issues.apache.org/jira/browse/FLINK-34526
 Project: Flink
  Issue Type: Sub-task
Reporter: junzhong qin
 Attachments: image-2024-02-27-15-50-25-071.png, 
image-2024-02-27-15-50-39-337.png

In our test case, the pipeline is:

!image-2024-02-27-15-50-25-071.png!

parallelism = 100 and taskmanager.numberOfTaskSlots = 2

When the worker was killed at 2024-02-27 15:10:13,691
{code:java}
2024-02-27 15:10:13,691 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker container_e2472_1706081484717_60538_01_50 is terminated. 
Diagnostics: Container container_e2472_1706081484717_60538_01_50 marked as 
failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container killed on 
request. Exit code is 137[2024-02-27 15:10:12.763]Container exited with a 
non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external signal 
{code}
It took about 20 seconds to restart the job.
{code:java}
2024-02-27 15:10:30,749 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) 
switched from RUNNING to FAILED on 
container_e2472_1706081484717_60538_01_50 @ 
ip-10-169-18-98.idata-server.shopee.io 
(dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: 
TaskManager with id 
container_e2472_1706081484717_60538_01_50(ip-10-169-18-98.idata-server.shopee.io:5454)
 is no longer reachable.at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515)
 ~[flink-dist-shopee-1.17-SNAPSHOT.jar:shopee-1.17-SNAPSHOT]at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
 ~[flink-dist-shopee-1.17-SNAPSHOT.jar:shopee-1.17-SNAPSHOT]at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
 ~[flink-dist-shopee-1.17-SNAPSHOT.jar:shopee-1.17-SNAPSHOT]at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
 ~[flink-dist-shopee-1.17-SNAPSHOT.jar:shopee-1.17-SNAPSHOT]at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
 ~[flink-dist-shopee-1.17-SNAPSHOT.jar:shopee-1.17-SNAPSHOT]at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
 ~[flink-dist-shopee-1.17-SNAPSHOT.jar:shopee-1.17-SNAPSHOT]



// Deploy and run task
2024-02-27 15:10:32,426 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (69/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (70/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) 
switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
datagen_source[1] -> Sink: print_sink[2] (69/100) 
(2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) 
switched from INITIALIZING to RUNNING. {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34528) With shuffling data, when a Task Manager is killed, restarting the Flink job takes a considerable amount of time

2024-02-27 Thread junzhong qin (Jira)
junzhong qin created FLINK-34528:


 Summary: With shuffling data, when a Task Manager is killed, 
restarting the Flink job takes a considerable amount of time
 Key: FLINK-34528
 URL: https://issues.apache.org/jira/browse/FLINK-34528
 Project: Flink
  Issue Type: Sub-task
Reporter: junzhong qin
 Attachments: image-2024-02-27-16-35-04-464.png

In the test case, the pipeline looks like:

!image-2024-02-27-16-35-04-464.png!

The Source: Custom Source generates strings, and the job keyBy the strings to 
Sink: Unnamed.
 # parallelism = 100
 # taskmanager.numberOfTaskSlots = 2
 # disable checkpoint

The worker was killed at 
{code:java}
2024-02-27 16:41:49,982 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink: Unnamed 
(6/100) (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) 
switched from RUNNING to FAILED on 
container_e2472_1705993319725_62292_01_46 @ xxx 
(dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
 Connection unexpectedly closed by remote task manager 'xxx/10.169.18.138:35983 
[ container_e2472_1705993319725_62292_01_10(xxx:5454) ] '. This might 
indicate that the remote task manager was lost.at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code}
The job took about 16 seconds to restart.
{code:java}
// The task was scheduled to a task manager that had already been killed
2024-02-27 16:41:53,506 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source (16/100) (attempt #3) with attempt id 
2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and 
vertex id bc764cd8ddf7a0cff126f51c16239658_15 to 
container_e2472_1705993319725_62292_01_10 @ xxx (dataPort=35983) with 
allocation id 975dded4548ad15b36d0e5e6aac8f5b6

// The last task switched from INITIALIZING to RUNNING
2024-02-27 16:42:05,176 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed 
(64/100) 
(2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) 
switched from INITIALIZING to RUNNING. {code}



--
Th