Re: Job Recovery Time on TM Lost
Could you share the full logs with us for the second experiment, Lu? I cannot tell from the top of my head why it should take 30s unless you have configured a restart delay of 30s. Let's discuss FLINK-23216 on the JIRA ticket, Gen. I've now implemented FLINK-23209 [1] but it somehow has the problem that in a flakey environment you might not want to mark a TaskExecutor dead on the first connection loss. Maybe this is something we need to make configurable (e.g. introducing a threshold which admittedly is similar to the heartbeat timeout) so that the user can configure it for her environment. On the upside, if you mark the TaskExecutor dead on the first connection loss (assuming you have a stable network environment), then it can now detect lost TaskExecutors as fast as the heartbeat interval. [1] https://issues.apache.org/jira/browse/FLINK-23209 Cheers, Till On Fri, Jul 2, 2021 at 9:33 AM Gen Luo wrote: > Thanks for sharing, Till and Yang. > > @Lu > Sorry but I don't know how to explain the new test with the log. Let's > wait for others' reply. > > @Till > It would be nice if JIRAs could be fixed. Thanks again for proposing them. > > In addition, I was tracking an issue that RM keeps allocating and freeing > slots after a TM lost until its heartbeat timeout, when I found the > recovery costing as long as heartbeat timeout. That should be a minor bug > introduced by declarative resource management. I have created a JIRA about > the problem [1] and we can discuss it there if necessary. > > [1] https://issues.apache.org/jira/browse/FLINK-23216 > > Lu Niu 于2021年7月2日周五 上午3:13写道: > >> Another side question, Shall we add metric to cover the complete >> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only >> covers phase 1. Thanks! >> >> Best >> Lu >> >> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu wrote: >> >>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >>> >>> I did another test yesterday. In this test, I intentionally throw >>> exception from the source operator: >>> ``` >>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>> && errorFrenquecyInMin > 0 >>> && System.currentTimeMillis() - lastStartTime >= >>> errorFrenquecyInMin * 60 * 1000) { >>> lastStartTime = System.currentTimeMillis(); >>> throw new RuntimeException( >>> "Trigger expected exception at: " + lastStartTime); >>> } >>> ``` >>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >>> to 1s (because no need for container allocation). Why phase 1 still takes >>> 30s even though no TM is lost? >>> >>> Related logs: >>> ``` >>> 2021-06-30 00:55:07,463 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: >>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... >>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446 >>> 2021-06-30 00:55:07,509 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job >>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to >>> RESTARTING. >>> 2021-06-30 00:55:37,596 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job >>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to >>> RUNNING. >>> 2021-06-30 00:55:38,678 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph(time when >>> all tasks switch from CREATED to RUNNING) >>> ``` >>> Best >>> Lu >>> >>> >>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu wrote: >>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! I did another test yesterday. In this test, I intentionally throw exception from the source operator: ``` if (runtimeContext.getIndexOfThisSubtask() == 1 && errorFrenquecyInMin > 0 && System.currentTimeMillis() - lastStartTime >= errorFrenquecyInMin * 60 * 1000) { lastStartTime = System.currentTimeMillis(); throw new RuntimeException( "Trigger expected exception at: " + lastStartTime); } ``` In this case, I found phase 1 still takes about 30s and Phase 2 dropped to 1s (because no need for container allocation). Some logs: ``` ``` On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann wrote: > A quick addition, I think with FLINK-23202 it should now also be > possible to improve the heartbeat mechanism in the general case. We can > leverage the unreachability exception thrown if a remote target is no > longer reachable to mark an heartbeat target as no longer reachable [1]. > This can then be considered as if the heartbeat timeout has been > triggered. > That way we should detect lost TaskExecutors as fast as our heartbeat > interval is. > > [1]
Re: [VOTE] Release 1.13.2, release candidate #1
+1 (binding) * verified signatures and checksums * reviewed the announcement PR * built from sources and run an example, quickly checked Web UI * checked diff of pom.xml and NOTICE files from 1.13.1, o commons-io updated, o bundled guava:failureaccess addded in flink-sql-connector-kinesis which is properly reflected in the NOTICE file Best, Dawid On 01/07/2021 12:57, Yun Tang wrote: > Hi everyone, > Please review and vote on the release candidate #1 for the version 1.13.2, as > follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint 78A306590F1081CC6794DC7F62DAD618E07CF996 [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.13.2-rc1" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > > Best, > Yun Tang > > [1] > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522 > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.2-rc1/ > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > [4] https://repository.apache.org/content/repositories/orgapacheflink-1429/ > [5] https://github.com/apache/flink/releases/tag/release-1.13.2-rc1 > [6] https://github.com/apache/flink-web/pull/453 > > OpenPGP_signature Description: OpenPGP digital signature
[jira] [Created] (FLINK-23224) Support timestamp_ltz type in hive connector
Rui Li created FLINK-23224: -- Summary: Support timestamp_ltz type in hive connector Key: FLINK-23224 URL: https://issues.apache.org/jira/browse/FLINK-23224 Project: Flink Issue Type: New Feature Components: Connectors / Hive Reporter: Rui Li -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23223) When flushAlways is enabled the subpartition may lose notification of data availability
Yun Gao created FLINK-23223: --- Summary: When flushAlways is enabled the subpartition may lose notification of data availability Key: FLINK-23223 URL: https://issues.apache.org/jira/browse/FLINK-23223 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Yun Gao When the flushAways is enabled (namely set buffer timeout to 0), there might be cases like: # The subpartition emit an event which blocks the channel # The subpartition produce more records. However, this records would be notified since isBlocked = true. # When the downstream tasks resume the subpartition later, the subpartition would only mark isBlocked to false. For local input channels although it tries to add the channel if isAvailable = true, but this check would not pass since flushRequest = true. One case for this issue is https://issues.apache.org/jira/browse/FLINK-22085 which uses LocalInputChannel. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23222) Translate page 'Application Profiling & Debugging' of 'Operations/Debugging' into Chinese
pierrexiong created FLINK-23222: --- Summary: Translate page 'Application Profiling & Debugging' of 'Operations/Debugging' into Chinese Key: FLINK-23222 URL: https://issues.apache.org/jira/browse/FLINK-23222 Project: Flink Issue Type: Improvement Components: chinese-translation, Documentation Affects Versions: 1.13.1 Reporter: pierrexiong * The markdown file location: flink/docs/content.zh/docs/ops/debugging/application_profiling.zh.md * The page url is: [https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/ops/debugging/application_profiling] * Related issue: https://issues.apache.org/jira/browse/FLINK-19036?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20chinese-translation%20AND%20text%20~%20%22Application%20Profiling%22 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23221) Docker image vulnerability
Razvan AGAPE created FLINK-23221: Summary: Docker image vulnerability Key: FLINK-23221 URL: https://issues.apache.org/jira/browse/FLINK-23221 Project: Flink Issue Type: Improvement Components: flink-docker Affects Versions: 1.13.1 Environment: Issue was discovered by AWS ECR image scanning on apache/flink:1.13.1-scala_2.12 Reporter: Razvan AGAPE The AWS ECR image scanning reports some HIGH vulnerabilities on apache/flink:1.13.1-scala_2.12 docker image. In addition, all versions prior to this one have these issues. The vulnerabilities are the following: # [CVE-2021-33574|https://security-tracker.debian.org/tracker/CVE-2021-33574] # [CVE-2019-25013 - for this one a patch was been released in glibc version 2.31-9|https://security-tracker.debian.org/tracker/CVE-2019-25013] Our security policy do not allow us to deploy images having security vulnerabilities. Searching through the Internet I found that for the first problem, a patch containing the solution will be release this year. Do you plan to release a new image containing the newer glibc version in order to solve those issues? Also, I checked and the alpine based flink images do not have these vulnerabilities. Do you plan to release newer versions of flink based on alpine (latest one is flink:1.8.x)? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23220) MailboxProcessor should wait for default action available even if there are no mails
Dawid Wysakowicz created FLINK-23220: Summary: MailboxProcessor should wait for default action available even if there are no mails Key: FLINK-23220 URL: https://issues.apache.org/jira/browse/FLINK-23220 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Dawid Wysakowicz Fix For: 1.14.0 See the discussion: https://github.com/apache/flink/pull/15055#issuecomment-872827247 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23219) temproary join ttl configruation does not take effect
waywtdcc created FLINK-23219: Summary: temproary join ttl configruation does not take effect Key: FLINK-23219 URL: https://issues.apache.org/jira/browse/FLINK-23219 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Runtime Affects Versions: 1.12.2 Reporter: waywtdcc Attachments: image-2021-07-02-16-29-40-310.png * version: flink 1.12.2 * problem: I run the job of table A temproary left join table B, and set the table.exec.state.ttl configuration to 3 hour or 2 sencond for test. But the task status keeps growing for more than 7 days. * code ``` tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2)); tableEnv.executeSql("drop table if exists persons_table_kafka2"); String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" + " `id` BIGINT,\n" + " `name` STRING,\n" + " `age` INT,\n" + " proctime as PROCTIME(),\n" + " `ts` TIMESTAMP(3),\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'persons_test_auto',\n" + " 'properties.bootstrap.servers' = 'node2:6667',\n" + " 'properties.group.id' = 'testGrodsu1765',\n" + " 'scan.startup.mode' = 'group-offsets',\n" + " 'format' = 'json'\n" + ")"; tableEnv.executeSql(kafka_source_sql); tableEnv.executeSql("drop table if exists persons_message_table_kafka2"); String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" + " `id` BIGINT,\n" + " `name` STRING,\n" + " `message` STRING,\n" + " `ts` TIMESTAMP(3) ," + // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'persons_extra_message_auto',\n" + " 'properties.bootstrap.servers' = 'node2:6667',\n" + " 'properties.group.id' = 'testGroud125313',\n" + " 'scan.startup.mode' = 'group-offsets',\n" + " 'format' = 'json'\n" + ")"; tableEnv.executeSql(kafka_source_sql2); tableEnv.executeSql( "CREATE TEMPORARY VIEW persons_message_table22 AS \n" + "SELECT id, name, message,ts \n" + " FROM (\n" + " SELECT *,\n" + " ROW_NUMBER() OVER (PARTITION BY name \n" + " ORDER BY ts DESC) AS rowNum \n" + " FROM persons_message_table_kafka2 " + " )\n" + "WHERE rowNum = 1"); tableEnv.executeSql("" + "CREATE TEMPORARY VIEW result_data_view " + " as " + " select " + " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " + " from persons_table_kafka2 t1 " + " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on t1.name = t2.name " ); ``` * the result like !image-2021-07-02-16-29-40-310.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23218) Distribute the ShuffleDescriptors via blob server
Zhilong Hong created FLINK-23218: Summary: Distribute the ShuffleDescriptors via blob server Key: FLINK-23218 URL: https://issues.apache.org/jira/browse/FLINK-23218 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.14.0 h3. Introduction The optimizations introduced in FLINK-21110 so far have improved the performance of job initialization, failover and partitions releasing. However, the task deployment is still slow. For a job with two vertices, each vertex has 8k parallelism and they are connected with the all-to-all edge. It takes 32.611s to deploy all the tasks and make them transition to running. If the parallelisms are 16k, it may take more than 2 minutes. As the creation of TaskDeploymentDescriptors runs in the main thread of jobmanager, it means that the jobmanager cannot deal with other akka messages like heartbeats, task status update, and etc., for more than two minutes. All in all, currently there are two issues in the deployment of tasks for large scale jobs: # It takes a long time to deploy tasks, especially for all-to-all edges. # Heartbeat timeout may happen during or after the procedure of task deployments. For the streaming job, it would cause the failover of the entire region. The job may never transition to running since there would be another heartbeat timeout during the procedure of new task deployments. h3. Proposal Task deployment involves the following procedures: # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread # TaskDeploymentDescriptor is serialized in the future executor # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call # TaskExecutors create a new task thread and execute it The optimization contains two parts: *1. Cache the compressed serialized value of ShuffleDescriptors* ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the IntermediateResultPartitions that a task consumes. For the downstream vertices connected with the all-to-all edge that has _N_ parallelism, we need to calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, they share the same ShuffleDescriptors since they all consume the same IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for each downstream vertex individually. We can just cache them. This will decrease the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to O(N). Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ times, so we can just cache the serialized value of ShuffleDescriptors instead of the original object. To decrease the size of akka messages and reduce the transmission of replicated data over the network, these serialized value can be compressed. *2. Distribute the ShuffleDescriptors via blob server* For ShuffleDescriptors of vertices with 8k parallelism, the size of their serialized value is more than 700 Kilobytes. After the compression, it would be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would become a heavy burden for the garbage collector to deal with. In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed via the blob server if their sizes exceed a certain threshold (which is defined as {{blob.offload.minsize}}). TaskExecutors request the information from the blob server once they begin to process the TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep all the copies in the heap memory until the TaskDeploymentDescriptors are all sent. There will be only one copy on the blob server. Like the JobInformation, we can just distribute the cached ShuffleDescriptors via the blob server if their overall size has exceeded the threshold. h3. Summary In summary, the optimization of task deployment is to introduce a cache for the TaskDeploymentDescriptor. We cache the compressed serialized value of ShuffleDescriptors. If the size of the value exceeds a certain threshold, the value would be distributed via the blob server. h3. Comparison We implemented a POC and conducted an experiment to compare the performance of our optimization. We choose the streaming job in the experiment because no task will be running until all tasks are deployed. This avoids other disturbing factors. The job contains two vertices: a source and a sink. They are connected with an all-to-all edge. The results illustrated below are the time interval between the timestamp of the first task that transitions to _deploying_ and the timestamp of the last task that transitions to _running_: ||Parallelism||Before||After ||
Re: [DISCUSS] Feedback Collection Jira Bot
+1 for the unassignment remark from Stephan Piotrek czw., 1 lip 2021 o 12:35 Stephan Ewen napisał(a): > It is true that the bot surfaces problems that are there (not enough > committer attention sometimes), but it also "rubs salt in the wound" of > contributors, and that is tricky. > > We can try it out with the extended periods (although I think that in > reality we probably need even longer periods) and see how it goes. > > One thing I would suggest is to never let the bot unassign issues. It just > strikes me as very cold and respectless to be unassigned by a bot from an > issue in which I invested time and energy. (The committers don't even take > the time to talk to me and explain why the contribution will not go > forward). > Unassignment should come from another person, possibly in response to a > ping from the bot. I think that makes a big difference in contributor > treatment. > > > > On Wed, Jun 30, 2021 at 12:30 PM Till Rohrmann > wrote: > > > I agree that we shouldn't discourage contributions. > > > > For me the main idea of the bot is not to clean up the JIRA but to > improve > > our communication and expectation management with the community. There > are > > many things we could do but for a lot of things we don't have the time > and > > capacity. Then to say at some point that we won't do something is just > > being honest. This also shows when looking at the JIRA numbers of the > > merged commits. We very rarely resolve tickets which are older than x > days > > and if we do, then we usually create a new ticket for the problem. > > > > The fact that we see some tickets with available pull requests go stale > is > > the symptom that we don't value them to be important enough or > > allocate enough time for external contributions imo. Otherwise, they > would > > have gotten the required attention and been merged. In such a case, > raising > > awareness by pinging the watchers of the respective ticket is probably > > better than silently ignoring the PR. Also adding labels to filter for > > these PRs should help to get them the required attention. But also here, > it > > happens very rarely that we actually merge a PR that is older than y > days. > > Ideally we avoid this situation altogether by only assigning contributors > > to tickets for which a committer has review capacity. However, this does > > not seem to always work. > > > > In some sense, the JIRA bot shows us the things, which fall through the > > cracks, more explicitly (which is probably not different than before). Of > > course we should try to find the time periods for when to ping or > > de-prioritize tickets that work best for the community. > > > > +1 for the proposed changes (extended time periods, "Not a Priority", > > default priority and fixVersion). > > > > @Piotr, I think we have the priorities defined here [1]. Maybe it is > enough > > to share the link so that everyone can check whether her assumptions are > > correct. > > > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process > > > > Cheers, > > Till > > > > On Wed, Jun 30, 2021 at 10:59 AM Piotr Nowojski > > wrote: > > > > > > * Introduce "Not a Priority" priority and stop closing tickets. > > > > > > +1 for this one (I also like the name you proposed for this Konstantin) > > > > > > I also have no objections to other proposals that you summarised. Just > a > > > remark, that independently of this discussion we might want to revisit > or > > > reconfirm the priorities and their definition/interpretation across all > > > contributors. > > > > > > Best, > > > Piotrek > > > > > > śr., 30 cze 2021 o 10:15 Konstantin Knauf > > napisał(a): > > > > > > > Hi everyone, > > > > > > > > Thank you for the additional comments and suggestions. > > > > > > > > @Stephan, Kurt: I agree that we shouldn't discourage or dishearten > > > > contributors, and probably 14 days until a ticket becomes > > > "stale-assigned" > > > > are too few. That's why I've already proposed to increase that to 30 > > > days. > > > > Similarly the times for Major/Critical tickets can be increased. From > > my > > > > perspective, the root causes are the following: > > > > > > > > * tickets are opened with the wrong priority (see > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process#FlinkJiraProcess-TicketsPriorities > > > > ). > > > > Here it might help to change the default priority. > > > > * committers don't have the time to review tickets or don't bring > > > community > > > > contributions to a resolution. The Jira bot makes this fact more > > visible. > > > > Without the Jira Bot no external contributor would get more > attention, > > > and > > > > no external contribution would be merged faster. Ideally, it'd be the > > > > opposite and committers would actively monitor tickets with labels > > > > "stale-assigned" and "pull-request-available" in order to review > those > > > with > > > > priority. That's also why I am not a fan
Re: Job Recovery Time on TM Lost
Thanks for sharing, Till and Yang. @Lu Sorry but I don't know how to explain the new test with the log. Let's wait for others' reply. @Till It would be nice if JIRAs could be fixed. Thanks again for proposing them. In addition, I was tracking an issue that RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout, when I found the recovery costing as long as heartbeat timeout. That should be a minor bug introduced by declarative resource management. I have created a JIRA about the problem [1] and we can discuss it there if necessary. [1] https://issues.apache.org/jira/browse/FLINK-23216 Lu Niu 于2021年7月2日周五 上午3:13写道: > Another side question, Shall we add metric to cover the complete > restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only > covers phase 1. Thanks! > > Best > Lu > > On Thu, Jul 1, 2021 at 12:09 PM Lu Niu wrote: > >> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >> >> I did another test yesterday. In this test, I intentionally throw >> exception from the source operator: >> ``` >> if (runtimeContext.getIndexOfThisSubtask() == 1 >> && errorFrenquecyInMin > 0 >> && System.currentTimeMillis() - lastStartTime >= >> errorFrenquecyInMin * 60 * 1000) { >> lastStartTime = System.currentTimeMillis(); >> throw new RuntimeException( >> "Trigger expected exception at: " + lastStartTime); >> } >> ``` >> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >> to 1s (because no need for container allocation). Why phase 1 still takes >> 30s even though no TM is lost? >> >> Related logs: >> ``` >> 2021-06-30 00:55:07,463 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: >> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... >> java.lang.RuntimeException: Trigger expected exception at: 1625014507446 >> 2021-06-30 00:55:07,509 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job >> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to >> RESTARTING. >> 2021-06-30 00:55:37,596 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job >> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to >> RUNNING. >> 2021-06-30 00:55:38,678 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph(time when >> all tasks switch from CREATED to RUNNING) >> ``` >> Best >> Lu >> >> >> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu wrote: >> >>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >>> >>> I did another test yesterday. In this test, I intentionally throw >>> exception from the source operator: >>> ``` >>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>> && errorFrenquecyInMin > 0 >>> && System.currentTimeMillis() - lastStartTime >= >>> errorFrenquecyInMin * 60 * 1000) { >>> lastStartTime = System.currentTimeMillis(); >>> throw new RuntimeException( >>> "Trigger expected exception at: " + lastStartTime); >>> } >>> ``` >>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >>> to 1s (because no need for container allocation). >>> >>> Some logs: >>> ``` >>> ``` >>> >>> >>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann >>> wrote: >>> A quick addition, I think with FLINK-23202 it should now also be possible to improve the heartbeat mechanism in the general case. We can leverage the unreachability exception thrown if a remote target is no longer reachable to mark an heartbeat target as no longer reachable [1]. This can then be considered as if the heartbeat timeout has been triggered. That way we should detect lost TaskExecutors as fast as our heartbeat interval is. [1] https://issues.apache.org/jira/browse/FLINK-23209 Cheers, Till On Thu, Jul 1, 2021 at 1:46 PM Yang Wang wrote: > Since you are deploying Flink workloads on Yarn, the Flink > ResourceManager should get the container > completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, > which is 8 seconds by default. > And Flink ResourceManager will release the dead TaskManager container > once received the completion event. > As a result, Flink will not deploy tasks onto the dead TaskManagers. > > > I think most of the time cost in Phase 1 might be cancelling the tasks > on the dead TaskManagers. > > > Best, > Yang > > > Till Rohrmann 于2021年7月1日周四 下午4:49写道: > >> The analysis of Gen is correct. Flink currently uses its heartbeat as >> the primary means to detect dead TaskManagers. This means that Flink will >> take at least `heartbeat.timeout` time before the system recovers. Even >> if >> the cancellation happens fast (e.g. by
Re: [ANNOUNCE] Criteria for merging pull requests is updated
It seems disabling the merge button was only proposed during the release testing phase, which IMO doesn't mean we can't use it forever. Best, Kurt On Fri, Jul 2, 2021 at 3:01 PM Xintong Song wrote: > It was part of the draft proposed in this mail [1]. And before that, it was > brought up several times in both ML discussions [2][3] and IIRC offline > release syncs. > > If that is too implicit and there are objections, I'm open to keeping on > that discussion. > > Thank you~ > > Xintong Song > > > [1] > > https://lists.apache.org/thread.html/r09c4b8a03bc431adb5d7eaa17cb8e849f16da7a802b20798f32235cc%40%3Cdev.flink.apache.org%3E > [2] > > https://lists.apache.org/thread.html/r76e1cdba577c6f4d6c86b23fdaeb53c4e3744c20d0b3e850fc2e14a7%40%3Cdev.flink.apache.org%3E > [3] > > https://lists.apache.org/thread.html/r25ed92303cdefe41cdcc2935c2b06040b1bc7590ded01a26506a1e49%40%3Cdev.flink.apache.org%3E > > On Fri, Jul 2, 2021 at 2:19 PM Chesnay Schepler > wrote: > > > > - SHOULD NOT use the GitHub UI to merge PRs > > > > Where was this discussed? > > > > > > On 7/2/2021 6:59 AM, Xintong Song wrote: > > > Hi Flink committers, > > > > > > As previously discussed [1], the criteria for merging pull requests has > > > been updated. > > > > > > A full version of guidelines can be found on the project wiki [2]. The > > > following are some of the highlights. > > > - MUST make sure passing the CI tests before merging PRs > > > - SHOULD NOT use the GitHub UI to merge PRs > > > - For frequent test instabilities that are temporarily disabled, the > > > corresponding JIRA tickets must be made BLOCKER > > > > > > I'd like to kindly ask all Flink committers to please read through the > > new > > > guidelines and merge PRs accordingly. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > [1] > > > > > > https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E > > > > > > [2] > > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests > > > > > > > >
[jira] [Created] (FLINK-23217) Support StreamExecValues json serialization/deserialization
godfrey he created FLINK-23217: -- Summary: Support StreamExecValues json serialization/deserialization Key: FLINK-23217 URL: https://issues.apache.org/jira/browse/FLINK-23217 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: godfrey he Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [ANNOUNCE] Criteria for merging pull requests is updated
It was part of the draft proposed in this mail [1]. And before that, it was brought up several times in both ML discussions [2][3] and IIRC offline release syncs. If that is too implicit and there are objections, I'm open to keeping on that discussion. Thank you~ Xintong Song [1] https://lists.apache.org/thread.html/r09c4b8a03bc431adb5d7eaa17cb8e849f16da7a802b20798f32235cc%40%3Cdev.flink.apache.org%3E [2] https://lists.apache.org/thread.html/r76e1cdba577c6f4d6c86b23fdaeb53c4e3744c20d0b3e850fc2e14a7%40%3Cdev.flink.apache.org%3E [3] https://lists.apache.org/thread.html/r25ed92303cdefe41cdcc2935c2b06040b1bc7590ded01a26506a1e49%40%3Cdev.flink.apache.org%3E On Fri, Jul 2, 2021 at 2:19 PM Chesnay Schepler wrote: > > - SHOULD NOT use the GitHub UI to merge PRs > > Where was this discussed? > > > On 7/2/2021 6:59 AM, Xintong Song wrote: > > Hi Flink committers, > > > > As previously discussed [1], the criteria for merging pull requests has > > been updated. > > > > A full version of guidelines can be found on the project wiki [2]. The > > following are some of the highlights. > > - MUST make sure passing the CI tests before merging PRs > > - SHOULD NOT use the GitHub UI to merge PRs > > - For frequent test instabilities that are temporarily disabled, the > > corresponding JIRA tickets must be made BLOCKER > > > > I'd like to kindly ask all Flink committers to please read through the > new > > guidelines and merge PRs accordingly. > > > > Thank you~ > > > > Xintong Song > > > > > > [1] > > > https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E > > > > [2] > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests > > > >
[jira] [Created] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout
Gen Luo created FLINK-23216: --- Summary: RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout Key: FLINK-23216 URL: https://issues.apache.org/jira/browse/FLINK-23216 Project: Flink Issue Type: Bug Affects Versions: 1.13.1 Reporter: Gen Luo In Flink 1.13, it's observed that the ResourceManager keeps allocating and freeing slots with a new TM when it's notified by yarn that a TM is lost. The behavior will continue until JM marks the TM as FAILED when its heartbeat timeout is reached. It can be easily reproduced by enlarging the akka.ask.timeout and heartbeat.timeout, for example to 10 min. After tracking, we find the procedure should be like this: When a TM is killed, yarn will first receive the event and notify the RM. In Flink 1.13, RM uses declarative resource management to manage the slots. It will find a lack of resources when receiving the notification, and then request a new TM from yarn. RM will then require the new TM to connect and offer slots to JM. But from JM's point of view, all slots are fulfilled, since the lost TM is not considered disconnected yet, until the heartbeat timeout is reached, so JM will reject all slot offers. The new TM will find no slot serving for the JM, then disconnect from the JM. RM will then find a lack of resources again and go back to step3, requiring the new TM to connect and offer slots to JM, but It won't request another new TM from yarn. The original log is lost but is like this: o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx. ...(repeat serval lines for different slots)... o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from container_xxx for job xxx. ...(repeat serval lines for different slots)... This could be fixed in several ways, such as notifying JM as well the RM receives a TM lost notification, TMs do not offer slots until required, etc. But all these ways have side effects so may need further discussion. Besides, this should no longer be an issue after FLINK-23209 is done. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished
Hi there, Since the voting time of FLIP-147[1] has passed, I'm closing the vote now. There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: - Dawid Wysakowicz (binding) - Piotr Nowojski(binding) - Jiangang Liu (binding) - Arvid Heise (binding) - Jing Zhang (binding) - Leonard Xu (non-binding) - Guowei Ma (binding) Thus I'm happy to announce that the update to the FLIP-147 is accepted. Very thanks everyone! Best, Yun [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[jira] [Created] (FLINK-23215) flink-table-code-splitter: NOTICE should in META-INF
Jingsong Lee created FLINK-23215: Summary: flink-table-code-splitter: NOTICE should in META-INF Key: FLINK-23215 URL: https://issues.apache.org/jira/browse/FLINK-23215 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Jingsong Lee Assignee: Caizhi Weng Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23214) Make ShuffleMaster a cluster level shared service
Yingjie Cao created FLINK-23214: --- Summary: Make ShuffleMaster a cluster level shared service Key: FLINK-23214 URL: https://issues.apache.org/jira/browse/FLINK-23214 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Yingjie Cao This ticket tries to make ShuffleMaster a cluster level shared service which makes it consistent with the ShuffleEnvironment at the TM side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [ANNOUNCE] Criteria for merging pull requests is updated
- SHOULD NOT use the GitHub UI to merge PRs Where was this discussed? On 7/2/2021 6:59 AM, Xintong Song wrote: Hi Flink committers, As previously discussed [1], the criteria for merging pull requests has been updated. A full version of guidelines can be found on the project wiki [2]. The following are some of the highlights. - MUST make sure passing the CI tests before merging PRs - SHOULD NOT use the GitHub UI to merge PRs - For frequent test instabilities that are temporarily disabled, the corresponding JIRA tickets must be made BLOCKER I'd like to kindly ask all Flink committers to please read through the new guidelines and merge PRs accordingly. Thank you~ Xintong Song [1] https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E [2] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests