Re: Job Recovery Time on TM Lost

2021-07-02 Thread Till Rohrmann
Could you share the full logs with us for the second experiment, Lu? I
cannot tell from the top of my head why it should take 30s unless you have
configured a restart delay of 30s.

Let's discuss FLINK-23216 on the JIRA ticket, Gen.

I've now implemented FLINK-23209 [1] but it somehow has the problem that in
a flakey environment you might not want to mark a TaskExecutor dead on the
first connection loss. Maybe this is something we need to make configurable
(e.g. introducing a threshold which admittedly is similar to the heartbeat
timeout) so that the user can configure it for her environment. On the
upside, if you mark the TaskExecutor dead on the first connection loss
(assuming you have a stable network environment), then it can now detect
lost TaskExecutors as fast as the heartbeat interval.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:

> Thanks for sharing, Till and Yang.
>
> @Lu
> Sorry but I don't know how to explain the new test with the log. Let's
> wait for others' reply.
>
> @Till
> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>
> In addition, I was tracking an issue that RM keeps allocating and freeing
> slots after a TM lost until its heartbeat timeout, when I found the
> recovery costing as long as heartbeat timeout. That should be a minor bug
> introduced by declarative resource management. I have created a JIRA about
> the problem [1] and  we can discuss it there if necessary.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
>
> Lu Niu  于2021年7月2日周五 上午3:13写道:
>
>> Another side question, Shall we add metric to cover the complete
>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>> covers phase 1. Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation). Why phase 1 still takes
>>> 30s even though no TM is lost?
>>>
>>> Related logs:
>>> ```
>>> 2021-06-30 00:55:07,463 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>> 2021-06-30 00:55:07,509 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>> RESTARTING.
>>> 2021-06-30 00:55:37,596 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>> RUNNING.
>>> 2021-06-30 00:55:38,678 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>>> all tasks switch from CREATED to RUNNING)
>>> ```
>>> Best
>>> Lu
>>>
>>>
>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>>
 Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

 I did another test yesterday. In this test, I intentionally throw
 exception from the source operator:
 ```
 if (runtimeContext.getIndexOfThisSubtask() == 1
 && errorFrenquecyInMin > 0
 && System.currentTimeMillis() - lastStartTime >=
 errorFrenquecyInMin * 60 * 1000) {
   lastStartTime = System.currentTimeMillis();
   throw new RuntimeException(
   "Trigger expected exception at: " + lastStartTime);
 }
 ```
 In this case, I found phase 1 still takes about 30s and Phase 2 dropped
 to 1s (because no need for container allocation).

 Some logs:
 ```
 ```


 On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
 wrote:

> A quick addition, I think with FLINK-23202 it should now also be
> possible to improve the heartbeat mechanism in the general case. We can
> leverage the unreachability exception thrown if a remote target is no
> longer reachable to mark an heartbeat target as no longer reachable [1].
> This can then be considered as if the heartbeat timeout has been 
> triggered.
> That way we should detect lost TaskExecutors as fast as our heartbeat
> interval is.
>
> [1] 

Re: [VOTE] Release 1.13.2, release candidate #1

2021-07-02 Thread Dawid Wysakowicz
+1 (binding)

  * verified signatures and checksums
  * reviewed the announcement PR
  * built from sources and run an example, quickly checked Web UI
  * checked diff of pom.xml and NOTICE files from 1.13.1,
  o commons-io updated,
  o bundled guava:failureaccess addded in
flink-sql-connector-kinesis which is properly reflected in the
NOTICE file

Best,

Dawid
On 01/07/2021 12:57, Yun Tang wrote:
> Hi everyone,
> Please review and vote on the release candidate #1 for the version 1.13.2, as 
> follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be 
> deployed to dist.apache.org [2], which are signed with the key with 
> fingerprint 78A306590F1081CC6794DC7F62DAD618E07CF996 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.13.2-rc1" [5],
> * website pull request listing the new release and adding announcement blog 
> post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
>
> Best,
> Yun Tang
>
> [1] 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.2-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1429/
> [5] https://github.com/apache/flink/releases/tag/release-1.13.2-rc1
> [6] https://github.com/apache/flink-web/pull/453
>
>


OpenPGP_signature
Description: OpenPGP digital signature


[jira] [Created] (FLINK-23224) Support timestamp_ltz type in hive connector

2021-07-02 Thread Rui Li (Jira)
Rui Li created FLINK-23224:
--

 Summary: Support timestamp_ltz type in hive connector
 Key: FLINK-23224
 URL: https://issues.apache.org/jira/browse/FLINK-23224
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Hive
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23223) When flushAlways is enabled the subpartition may lose notification of data availability

2021-07-02 Thread Yun Gao (Jira)
Yun Gao created FLINK-23223:
---

 Summary: When flushAlways is enabled the subpartition may lose 
notification of data availability
 Key: FLINK-23223
 URL: https://issues.apache.org/jira/browse/FLINK-23223
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yun Gao


When the flushAways is enabled (namely set buffer timeout to 0), there might be 
cases like:
 # The subpartition emit an event which blocks the channel
 # The subpartition produce more records. However, this records would be 
notified since isBlocked = true.
 # When the downstream tasks resume the subpartition later, the subpartition 
would only mark isBlocked to false. For local input channels although it tries 
to add the channel if isAvailable = true, but this check would not pass since 
flushRequest = true. 

One case for this issue is https://issues.apache.org/jira/browse/FLINK-22085 
which uses LocalInputChannel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23222) Translate page 'Application Profiling & Debugging' of 'Operations/Debugging' into Chinese

2021-07-02 Thread pierrexiong (Jira)
pierrexiong created FLINK-23222:
---

 Summary: Translate page 'Application Profiling & Debugging' of 
'Operations/Debugging' into Chinese
 Key: FLINK-23222
 URL: https://issues.apache.org/jira/browse/FLINK-23222
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Affects Versions: 1.13.1
Reporter: pierrexiong


* The markdown file location: 
flink/docs/content.zh/docs/ops/debugging/application_profiling.zh.md
 * The page url is: 
[https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/ops/debugging/application_profiling]
 * Related issue: 
https://issues.apache.org/jira/browse/FLINK-19036?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20chinese-translation%20AND%20text%20~%20%22Application%20Profiling%22



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23221) Docker image vulnerability

2021-07-02 Thread Razvan AGAPE (Jira)
Razvan AGAPE created FLINK-23221:


 Summary: Docker image vulnerability
 Key: FLINK-23221
 URL: https://issues.apache.org/jira/browse/FLINK-23221
 Project: Flink
  Issue Type: Improvement
  Components: flink-docker
Affects Versions: 1.13.1
 Environment: Issue was discovered by AWS ECR image scanning on 
apache/flink:1.13.1-scala_2.12
Reporter: Razvan AGAPE


The AWS ECR image scanning reports some HIGH vulnerabilities on 
apache/flink:1.13.1-scala_2.12 docker image. In addition, all versions prior to 
this one have these issues.

The vulnerabilities are the following:
 # [CVE-2021-33574|https://security-tracker.debian.org/tracker/CVE-2021-33574]
 # [CVE-2019-25013 - for this one a patch was been released in glibc version 
2.31-9|https://security-tracker.debian.org/tracker/CVE-2019-25013]

Our security policy do not allow us to deploy images having security 
vulnerabilities. Searching through the Internet I found that for the first 
problem, a patch containing the solution will be release this year.

Do you plan to release a new image containing the newer glibc version in order 
to solve those issues?

Also, I checked and the alpine based flink images do not have these 
vulnerabilities. Do you plan to release newer versions of flink based on alpine 
(latest one is flink:1.8.x)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23220) MailboxProcessor should wait for default action available even if there are no mails

2021-07-02 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-23220:


 Summary: MailboxProcessor should wait for default action available 
even if there are no mails
 Key: FLINK-23220
 URL: https://issues.apache.org/jira/browse/FLINK-23220
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Dawid Wysakowicz
 Fix For: 1.14.0


See the discussion: 
https://github.com/apache/flink/pull/15055#issuecomment-872827247



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23219) temproary join ttl configruation does not take effect

2021-07-02 Thread waywtdcc (Jira)
waywtdcc created FLINK-23219:


 Summary: temproary join ttl configruation does not take effect
 Key: FLINK-23219
 URL: https://issues.apache.org/jira/browse/FLINK-23219
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Runtime
Affects Versions: 1.12.2
Reporter: waywtdcc
 Attachments: image-2021-07-02-16-29-40-310.png

* version: flink 1.12.2
 *  problem: I run the job of table A temproary left join table B, and set the 
table.exec.state.ttl configuration
 to 3 hour or 2 sencond for test. But the task status keeps growing for more 
than 7 days.
 *  code
 ```
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2));

tableEnv.executeSql("drop table if exists persons_table_kafka2");
 String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
 " `id` BIGINT,\n" +
 " `name` STRING,\n" +
 " `age` INT,\n" +
 " proctime as PROCTIME(),\n" +
 " `ts` TIMESTAMP(3),\n" +
 " WATERMARK FOR ts AS ts\n" +
 ") WITH (\n" +
 " 'connector' = 'kafka',\n" +
 " 'topic' = 'persons_test_auto',\n" +
 " 'properties.bootstrap.servers' = 'node2:6667',\n" +
 " 'properties.group.id' = 'testGrodsu1765',\n" +
 " 'scan.startup.mode' = 'group-offsets',\n" +
 " 'format' = 'json'\n" +
 ")";
 tableEnv.executeSql(kafka_source_sql);



 tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
 String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
 " `id` BIGINT,\n" +
 " `name` STRING,\n" +
 " `message` STRING,\n" +
 " `ts` TIMESTAMP(3) ," +
// " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
 " WATERMARK FOR ts AS ts\n" +
 ") WITH (\n" +
 " 'connector' = 'kafka',\n" +
 " 'topic' = 'persons_extra_message_auto',\n" +
 " 'properties.bootstrap.servers' = 'node2:6667',\n" +
 " 'properties.group.id' = 'testGroud125313',\n" +
 " 'scan.startup.mode' = 'group-offsets',\n" +
 " 'format' = 'json'\n" +
 ")";
 tableEnv.executeSql(kafka_source_sql2);


 tableEnv.executeSql(
 "CREATE TEMPORARY VIEW persons_message_table22 AS \n" +
 "SELECT id, name, message,ts \n" +
 " FROM (\n" +
 " SELECT *,\n" +
 " ROW_NUMBER() OVER (PARTITION BY name \n" +
 " ORDER BY ts DESC) AS rowNum \n" +
 " FROM persons_message_table_kafka2 " +
 " )\n" +
 "WHERE rowNum = 1");


 tableEnv.executeSql("" +
 "CREATE TEMPORARY VIEW result_data_view " +
 " as " +
 " select " +
 " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
 " from persons_table_kafka2 t1 " +
 " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on 
t1.name = t2.name "
 );

```
 * the result like  !image-2021-07-02-16-29-40-310.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-02 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23218:


 Summary: Distribute the ShuffleDescriptors via blob server
 Key: FLINK-23218
 URL: https://issues.apache.org/jira/browse/FLINK-23218
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.14.0


h3. Introduction

The optimizations introduced in FLINK-21110 so far have improved the 
performance of job initialization, failover and partitions releasing. However, 
the task deployment is still slow. For a job with two vertices, each vertex has 
8k parallelism and they are connected with the all-to-all edge. It takes 
32.611s to deploy all the tasks and make them transition to running. If the 
parallelisms are 16k, it may take more than 2 minutes.

As the creation of TaskDeploymentDescriptors runs in the main thread of 
jobmanager, it means that the jobmanager cannot deal with other akka messages 
like heartbeats, task status update, and etc., for more than two minutes.

 

All in all, currently there are two issues in the deployment of tasks for large 
scale jobs:
 # It takes a long time to deploy tasks, especially for all-to-all edges.
 # Heartbeat timeout may happen during or after the procedure of task 
deployments. For the streaming job, it would cause the failover of the entire 
region. The job may never transition to running since there would be another 
heartbeat timeout during the procedure of new task deployments.

h3. Proposal

Task deployment involves the following procedures:
 # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread
 # TaskDeploymentDescriptor is serialized in the future executor
 # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
 # TaskExecutors create a new task thread and execute it

The optimization contains two parts:

*1. Cache the compressed serialized value of ShuffleDescriptors*

ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
IntermediateResultPartitions that a task consumes. For the downstream vertices 
connected with the all-to-all edge that has _N_ parallelism, we need to 
calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, 
they share the same ShuffleDescriptors since they all consume the same 
IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for 
each downstream vertex individually. We can just cache them. This will decrease 
the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to 
O(N).

Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
times, so we can just cache the serialized value of ShuffleDescriptors instead 
of the original object. To decrease the size of akka messages and reduce the 
transmission of replicated data over the network, these serialized value can be 
compressed.

*2. Distribute the ShuffleDescriptors via blob server*

For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
serialized value is more than 700 Kilobytes. After the compression, it would be 
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more 
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
become a heavy burden for the garbage collector to deal with.

In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed 
via the blob server if their sizes exceed a certain threshold (which is defined 
as {{blob.offload.minsize}}). TaskExecutors request the information from the 
blob server once they begin to process the TaskDeploymentDescriptor. This make 
sure that jobmanager don't need to keep all the copies in the heap memory until 
the TaskDeploymentDescriptors are all sent. There will be only one copy on the 
blob server. Like the JobInformation, we can just distribute the cached 
ShuffleDescriptors via the blob server if their overall size has exceeded the 
threshold.
h3. Summary

In summary, the optimization of task deployment is to introduce a cache for the 
TaskDeploymentDescriptor. We cache the compressed serialized value of 
ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
value would be distributed via the blob server.
h3. Comparison

We implemented a POC and conducted an experiment to compare the performance of 
our optimization. We choose the streaming job in the experiment because no task 
will be running until all tasks are deployed. This avoids other disturbing 
factors. The job contains two vertices: a source and a sink. They are connected 
with an all-to-all edge.

The results illustrated below are the time interval between the timestamp of 
the first task that transitions to _deploying_ and the timestamp of the last 
task that transitions to _running_:
||Parallelism||Before||After ||

Re: [DISCUSS] Feedback Collection Jira Bot

2021-07-02 Thread Piotr Nowojski
+1 for the unassignment remark from Stephan

Piotrek

czw., 1 lip 2021 o 12:35 Stephan Ewen  napisał(a):

> It is true that the bot surfaces problems that are there (not enough
> committer attention sometimes), but it also "rubs salt in the wound" of
> contributors, and that is tricky.
>
> We can try it out with the extended periods (although I think that in
> reality we probably need even longer periods) and see how it goes.
>
> One thing I would suggest is to never let the bot unassign issues. It just
> strikes me as very cold and respectless to be unassigned by a bot from an
> issue in which I invested time and energy. (The committers don't even take
> the time to talk to me and explain why the contribution will not go
> forward).
> Unassignment should come from another person, possibly in response to a
> ping from the bot. I think that makes a big difference in contributor
> treatment.
>
>
>
> On Wed, Jun 30, 2021 at 12:30 PM Till Rohrmann 
> wrote:
>
> > I agree that we shouldn't discourage contributions.
> >
> > For me the main idea of the bot is not to clean up the JIRA but to
> improve
> > our communication and expectation management with the community. There
> are
> > many things we could do but for a lot of things we don't have the time
> and
> > capacity. Then to say at some point that we won't do something is just
> > being honest. This also shows when looking at the JIRA numbers of the
> > merged commits. We very rarely resolve tickets which are older than x
> days
> > and if we do, then we usually create a new ticket for the problem.
> >
> > The fact that we see some tickets with available pull requests go stale
> is
> > the symptom that we don't value them to be important enough or
> > allocate enough time for external contributions imo. Otherwise, they
> would
> > have gotten the required attention and been merged. In such a case,
> raising
> > awareness by pinging the watchers of the respective ticket is probably
> > better than silently ignoring the PR. Also adding labels to filter for
> > these PRs should help to get them the required attention. But also here,
> it
> > happens very rarely that we actually merge a PR that is older than y
> days.
> > Ideally we avoid this situation altogether by only assigning contributors
> > to tickets for which a committer has review capacity. However, this does
> > not seem to always work.
> >
> > In some sense, the JIRA bot shows us the things, which fall through the
> > cracks, more explicitly (which is probably not different than before). Of
> > course we should try to find the time periods for when to ping or
> > de-prioritize tickets that work best for the community.
> >
> > +1 for the proposed changes (extended time periods, "Not a Priority",
> > default priority and fixVersion).
> >
> > @Piotr, I think we have the priorities defined here [1]. Maybe it is
> enough
> > to share the link so that everyone can check whether her assumptions are
> > correct.
> >
> > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process
> >
> > Cheers,
> > Till
> >
> > On Wed, Jun 30, 2021 at 10:59 AM Piotr Nowojski 
> > wrote:
> >
> > > > * Introduce "Not a Priority" priority and stop closing tickets.
> > >
> > > +1 for this one (I also like the name you proposed for this Konstantin)
> > >
> > > I also have no objections to other proposals that you summarised. Just
> a
> > > remark, that independently of this discussion we might want to revisit
> or
> > > reconfirm the priorities and their definition/interpretation across all
> > > contributors.
> > >
> > > Best,
> > > Piotrek
> > >
> > > śr., 30 cze 2021 o 10:15 Konstantin Knauf 
> > napisał(a):
> > >
> > > > Hi everyone,
> > > >
> > > > Thank you for the additional comments and suggestions.
> > > >
> > > > @Stephan, Kurt: I agree that we shouldn't discourage or dishearten
> > > > contributors, and probably 14 days until a ticket becomes
> > > "stale-assigned"
> > > > are too few. That's why I've already proposed to increase that to 30
> > > days.
> > > > Similarly the times for Major/Critical tickets can be increased. From
> > my
> > > > perspective, the root causes are the following:
> > > >
> > > > * tickets are opened with the wrong priority (see
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process#FlinkJiraProcess-TicketsPriorities
> > > > ).
> > > > Here it might help to change the default priority.
> > > > * committers don't have the time to review tickets or don't bring
> > > community
> > > > contributions to a resolution. The Jira bot makes this fact more
> > visible.
> > > > Without the Jira Bot no external contributor would get more
> attention,
> > > and
> > > > no external contribution would be merged faster. Ideally, it'd be the
> > > > opposite and committers would actively monitor tickets with labels
> > > > "stale-assigned" and "pull-request-available" in order to review
> those
> > > with
> > > > priority. That's also why I am not a fan 

Re: Job Recovery Time on TM Lost

2021-07-02 Thread Gen Luo
Thanks for sharing, Till and Yang.

@Lu
Sorry but I don't know how to explain the new test with the log. Let's wait
for others' reply.

@Till
It would be nice if JIRAs could be fixed. Thanks again for proposing them.

In addition, I was tracking an issue that RM keeps allocating and freeing
slots after a TM lost until its heartbeat timeout, when I found the
recovery costing as long as heartbeat timeout. That should be a minor bug
introduced by declarative resource management. I have created a JIRA about
the problem [1] and  we can discuss it there if necessary.

[1] https://issues.apache.org/jira/browse/FLINK-23216

Lu Niu  于2021年7月2日周五 上午3:13写道:

> Another side question, Shall we add metric to cover the complete
> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
> covers phase 1. Thanks!
>
> Best
> Lu
>
> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>   lastStartTime = System.currentTimeMillis();
>>   throw new RuntimeException(
>>   "Trigger expected exception at: " + lastStartTime);
>> }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation). Why phase 1 still takes
>> 30s even though no TM is lost?
>>
>> Related logs:
>> ```
>> 2021-06-30 00:55:07,463 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>> 2021-06-30 00:55:07,509 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>> RESTARTING.
>> 2021-06-30 00:55:37,596 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>> RUNNING.
>> 2021-06-30 00:55:38,678 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>> all tasks switch from CREATED to RUNNING)
>> ```
>> Best
>> Lu
>>
>>
>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation).
>>>
>>> Some logs:
>>> ```
>>> ```
>>>
>>>
>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
>>> wrote:
>>>
 A quick addition, I think with FLINK-23202 it should now also be
 possible to improve the heartbeat mechanism in the general case. We can
 leverage the unreachability exception thrown if a remote target is no
 longer reachable to mark an heartbeat target as no longer reachable [1].
 This can then be considered as if the heartbeat timeout has been triggered.
 That way we should detect lost TaskExecutors as fast as our heartbeat
 interval is.

 [1] https://issues.apache.org/jira/browse/FLINK-23209

 Cheers,
 Till

 On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:

> Since you are deploying Flink workloads on Yarn, the Flink
> ResourceManager should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
> which is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container
> once received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks
> on the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>> the primary means to detect dead TaskManagers. This means that Flink will
>> take at least `heartbeat.timeout` time before the system recovers. Even 
>> if
>> the cancellation happens fast (e.g. by 

Re: [ANNOUNCE] Criteria for merging pull requests is updated

2021-07-02 Thread Kurt Young
It seems disabling the merge button was only proposed during the release
testing phase, which IMO doesn't
mean we can't use it forever.

Best,
Kurt


On Fri, Jul 2, 2021 at 3:01 PM Xintong Song  wrote:

> It was part of the draft proposed in this mail [1]. And before that, it was
> brought up several times in both ML discussions [2][3] and IIRC offline
> release syncs.
>
> If that is too implicit and there are objections, I'm open to keeping on
> that discussion.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://lists.apache.org/thread.html/r09c4b8a03bc431adb5d7eaa17cb8e849f16da7a802b20798f32235cc%40%3Cdev.flink.apache.org%3E
> [2]
>
> https://lists.apache.org/thread.html/r76e1cdba577c6f4d6c86b23fdaeb53c4e3744c20d0b3e850fc2e14a7%40%3Cdev.flink.apache.org%3E
> [3]
>
> https://lists.apache.org/thread.html/r25ed92303cdefe41cdcc2935c2b06040b1bc7590ded01a26506a1e49%40%3Cdev.flink.apache.org%3E
>
> On Fri, Jul 2, 2021 at 2:19 PM Chesnay Schepler 
> wrote:
>
> > > - SHOULD NOT use the GitHub UI to merge PRs
> >
> > Where was this discussed?
> >
> >
> > On 7/2/2021 6:59 AM, Xintong Song wrote:
> > > Hi Flink committers,
> > >
> > > As previously discussed [1], the criteria for merging pull requests has
> > > been updated.
> > >
> > > A full version of guidelines can be found on the project wiki [2]. The
> > > following are some of the highlights.
> > > - MUST make sure passing the CI tests before merging PRs
> > > - SHOULD NOT use the GitHub UI to merge PRs
> > > - For frequent test instabilities that are temporarily disabled, the
> > > corresponding JIRA tickets must be made BLOCKER
> > >
> > > I'd like to kindly ask all Flink committers to please read through the
> > new
> > > guidelines and merge PRs accordingly.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> >
> https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E
> > >
> > > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> > >
> >
> >
>


[jira] [Created] (FLINK-23217) Support StreamExecValues json serialization/deserialization

2021-07-02 Thread godfrey he (Jira)
godfrey he created FLINK-23217:
--

 Summary: Support StreamExecValues json 
serialization/deserialization
 Key: FLINK-23217
 URL: https://issues.apache.org/jira/browse/FLINK-23217
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Criteria for merging pull requests is updated

2021-07-02 Thread Xintong Song
It was part of the draft proposed in this mail [1]. And before that, it was
brought up several times in both ML discussions [2][3] and IIRC offline
release syncs.

If that is too implicit and there are objections, I'm open to keeping on
that discussion.

Thank you~

Xintong Song


[1]
https://lists.apache.org/thread.html/r09c4b8a03bc431adb5d7eaa17cb8e849f16da7a802b20798f32235cc%40%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/r76e1cdba577c6f4d6c86b23fdaeb53c4e3744c20d0b3e850fc2e14a7%40%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/r25ed92303cdefe41cdcc2935c2b06040b1bc7590ded01a26506a1e49%40%3Cdev.flink.apache.org%3E

On Fri, Jul 2, 2021 at 2:19 PM Chesnay Schepler  wrote:

> > - SHOULD NOT use the GitHub UI to merge PRs
>
> Where was this discussed?
>
>
> On 7/2/2021 6:59 AM, Xintong Song wrote:
> > Hi Flink committers,
> >
> > As previously discussed [1], the criteria for merging pull requests has
> > been updated.
> >
> > A full version of guidelines can be found on the project wiki [2]. The
> > following are some of the highlights.
> > - MUST make sure passing the CI tests before merging PRs
> > - SHOULD NOT use the GitHub UI to merge PRs
> > - For frequent test instabilities that are temporarily disabled, the
> > corresponding JIRA tickets must be made BLOCKER
> >
> > I'd like to kindly ask all Flink committers to please read through the
> new
> > guidelines and merge PRs accordingly.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> >
> https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E
> >
> > [2]
> https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> >
>
>


[jira] [Created] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout

2021-07-02 Thread Gen Luo (Jira)
Gen Luo created FLINK-23216:
---

 Summary: RM keeps allocating and freeing slots after a TM lost 
until its heartbeat timeout
 Key: FLINK-23216
 URL: https://issues.apache.org/jira/browse/FLINK-23216
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.1
Reporter: Gen Luo


In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
behavior will continue until JM marks the TM as FAILED when its heartbeat 
timeout is reached. It can be easily reproduced by enlarging the 
akka.ask.timeout and heartbeat.timeout, for example to 10 min.

 

After tracking, we find the procedure should be like this:

When a TM is killed, yarn will first receive the event and notify the RM.

In Flink 1.13, RM uses declarative resource management to manage the slots. It 
will find a lack of resources when receiving the notification, and then request 
a new TM from yarn.

RM will then require the new TM to connect and offer slots to JM.

But from JM's point of view, all slots are fulfilled, since the lost TM is not 
considered disconnected yet, until the heartbeat timeout is reached, so JM will 
reject all slot offers.

The new TM will find no slot serving for the JM, then disconnect from the JM.

RM will then find a lack of resources again and go back to step3, requiring the 
new TM to connect and offer slots to JM, but It won't request another new TM 
from yarn.

 

The original log is lost but is like this:

o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.

...(repeat serval lines for different slots)...

o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
container_xxx for job xxx.

...(repeat serval lines for different slots)...

 

This could be fixed in several ways, such as notifying JM as well the RM 
receives a TM lost notification, TMs do not offer slots until required, etc. 
But all these ways have side effects so may need further discussion. 

Besides, this should no longer be an issue after FLINK-23209 is done.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-02 Thread Yun Gao
Hi there,

Since the voting time of FLIP-147[1] has passed, I'm closing the vote now.

There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes:

- Dawid Wysakowicz (binding)
- Piotr Nowojski(binding)
- Jiangang Liu (binding)
- Arvid Heise (binding)
- Jing Zhang (binding)
- Leonard Xu (non-binding)
- Guowei Ma (binding)

Thus I'm happy to announce that the update to the FLIP-147 is accepted.

Very thanks everyone!

Best,
Yun

[1]  https://cwiki.apache.org/confluence/x/mw-ZCQ

[jira] [Created] (FLINK-23215) flink-table-code-splitter: NOTICE should in META-INF

2021-07-02 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-23215:


 Summary: flink-table-code-splitter: NOTICE should in META-INF
 Key: FLINK-23215
 URL: https://issues.apache.org/jira/browse/FLINK-23215
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Caizhi Weng
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-07-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23214:
---

 Summary: Make ShuffleMaster a cluster level shared service
 Key: FLINK-23214
 URL: https://issues.apache.org/jira/browse/FLINK-23214
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao


This ticket tries to make ShuffleMaster a cluster level shared service which 
makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Criteria for merging pull requests is updated

2021-07-02 Thread Chesnay Schepler

- SHOULD NOT use the GitHub UI to merge PRs


Where was this discussed?


On 7/2/2021 6:59 AM, Xintong Song wrote:

Hi Flink committers,

As previously discussed [1], the criteria for merging pull requests has
been updated.

A full version of guidelines can be found on the project wiki [2]. The
following are some of the highlights.
- MUST make sure passing the CI tests before merging PRs
- SHOULD NOT use the GitHub UI to merge PRs
- For frequent test instabilities that are temporarily disabled, the
corresponding JIRA tickets must be made BLOCKER

I'd like to kindly ask all Flink committers to please read through the new
guidelines and merge PRs accordingly.

Thank you~

Xintong Song


[1]
https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E

[2] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests