[jira] [Created] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using Junit4 while some child tests are using JUnit5

2023-01-28 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30815:
---

 Summary: BatchTestBase/BatchAbstractTestBase are using Junit4 
while some child tests are using JUnit5
 Key: FLINK-30815
 URL: https://issues.apache.org/jira/browse/FLINK-30815
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.16.0
Reporter: Zhu Zhu


BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests 
(e.g. DynamicFilteringITCase) are using JUnit5. This may break some assumption 
and hide some problems.
For example, the child test will create a MiniCluster by itself, instead of 
using the MiniCluster(TM=1, slots=3)  created in BatchAbstractTestBase. The 
created MiniCluster may  have more slots and hide resource deadlock issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30814) The parallelism of sort after a global partitioning is forced to be 1

2023-01-28 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30814:
---

 Summary: The parallelism of sort after a global 
partitioning is forced to be 1
 Key: FLINK-30814
 URL: https://issues.apache.org/jira/browse/FLINK-30814
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: Zhu Zhu
 Fix For: 1.17.0


The parallelism of sort after a global partitioning is forced to 
be 1. The may lead to the parallelism to be changed by adaptive batch 
scheduler, which is unexpected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


unsucscribe

2023-01-28 Thread Jake


Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-01-28 Thread yuxia
I agree with JingGe and share same concern about sinkv2 may not stable enough 
to be graduated.
After a quick browsing , I find some known bugs [1][2] that need to be fixed.
[1] https://issues.apache.org/jira/browse/FLINK-30238
[2] https://issues.apache.org/jira/browse/FLINK-29459

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jing Ge" 
收件人: "dev" 
抄送: "Yun Gao" , "Qingsheng Ren" , 
"Jark Wu" 
发送时间: 星期二, 2023年 1 月 24日 下午 7:25:59
主题: Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

Hi Martijn,

Thanks for sharing more information to help us have a clear big picture.

1) I agree with you. We should graduate the SinkV2 API asap. I have also
tried many times in the past and there were always some issues that
postponed the graduation and they made sense. One rule to check if an API
is ready to be @Public is not to check when it was introduced but to
analyse when and how big the last change was. Most SinkV2 implementation
was done or updated in early 2022 because of FLIP-191[1]. After that, there
were many bugs-fixed until now, which shows it is not stable enough to be
graduated.

2,3,4) one reason that FLIP-197 [2] proposed two release cycles for each
graduation(from @PublicEvolving to @Public) is to give both Flink users and
Flink developers enough time to evaluate, improve, and stabilize the API.
What we really checked is whether the implementation met the requirement
which turns out that the API is a good fit. That's why I said Sink API is a
special case because connectors are the implementations of Sink API. It is
a strong dependency between them, not orthogonal. I checked all connectors
that implement SinkV2. None of them is @Public. It is weird to graduate
interfaces(the design) without one single successful graduation of its
implementations. For Sink API, since it will cover so many different
heterogeneous downstream systems, as I said, having three graduated
implementations is a safer process.

6) FlieSink can be used in both stream and batch mode. JDBC is more or less
a (pure) batch-oriented case.

Long story short, the question is actually about the risk that we could
live with. I personally feel comfortable to graduate it along with some
connectors(Kafka, File, and JDBC) in 1.18. Doing it in 1.17 is a little bit
risky. Since @Qingsheng Ren  and @Yun Gao
 have been working on Sink, I would like to have
their thoughts. They still have public holidays and might join this thread
next week. It would be great if we could keep this thread open until then.
If all of you are aware of those risks and are still fine with it, I will
be happy to see the SinkV2 API get graduated with the 1.17 release. :-)

Best regards,
Jing



[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process

On Thu, Jan 19, 2023 at 10:08 AM Martijn Visser 
wrote:

> Hi Jing,
>
> Thanks for your input!
>
> 1) I think that we have had more than two release cycles for the Sink V2
> API. The first release of the Sink API was introduced with Flink 1.12 which
> was December 2020, more than 2 years ago. The additional feature that Sink
> V2 has introduced was the ability to hook custom topologies, everything
> else remained the same. Like you've mentioned, the Sink API is super
> important why I don't understand why we actually shouldn't move it
> to @public. As outlined in FLIP-197, "The community should also try to
> stabilize new APIs as soon as possible so that Flink users can rely on
> them. If we don’t do this, then users start building against
> @PublicEvolving and weaker annotated APIs. This might then lead to
> disappointment."
>
> 2 & 3) The ElasticSearch Sink is also using Sink V2, as is OpenSearch and
> the Sinks that are using the Async Sink from FLIP-171 [1]. So I think
> that's also AWS Firehose, Kinesis, DynamoDB. It's actually weird that
> KafkaSink is not yet at @publicevolving, given that the FlinkKafkaProducer
> has already been marked as deprecated with Flink 1.14. [2]. The FileSink
> being experimental is even weirder, given that that was introduced in Flink
> 1.12 [3] and should also have been @public. I think the fact that some
> missing functionality might be missing should not be a reason to graduate
> an API to the next phase (that also applies to nr 5 from your email). I
> believe the intent of the graduation process is about guaranteeing that
> functionality that already exists won't be broken and will remain working
> if you've currently built an implementation with that.
>
> 4) I think the graduation of a connector is orthogonal to the graduation of
> the Flink interfaces that such a connector is using. I don't think that
> we're treating Kafka like a first class connector over the others: there
> are many more connectors that are using the Sink V2 interfaces.
>
> 6) Given that we have the FileSink that already uses the 

[jira] [Created] (FLINK-30813) Residual zk data when using the kubernetes session mode

2023-01-28 Thread liuzhuo (Jira)
liuzhuo created FLINK-30813:
---

 Summary: Residual zk data when using the kubernetes session mode
 Key: FLINK-30813
 URL: https://issues.apache.org/jira/browse/FLINK-30813
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Reporter: liuzhuo


    If we use kubernetes session mode and use Zookeeper(ZK) as the HA service, 
the HA data on ZK is not cleared after the session stops.

    Because  when deleting a session, only call this method:
{code:java}
kubernetesClusterDescriptor.killCluster(clusterId);
{code}
    However, this method only deletes the deployment associated with the 
clusterId. If ZK is used as the HA service, data on ZK will be left over when 
the HA stops, resulting in more and more data on zk.

 

Maybe we need to add
{code:java}
ClusterClient#shutDownCluster(){code}
or
{code:java}
HighAvailabilityServices#closeAndCleanupAllData(){code}
When using session mode (native kubernetes)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30812) YARN with S3 resource storage fails for Hadoop 3.3.2

2023-01-28 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-30812:


 Summary: YARN with S3 resource storage fails for Hadoop 3.3.2
 Key: FLINK-30812
 URL: https://issues.apache.org/jira/browse/FLINK-30812
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.16.0
Reporter: Mate Czagany


In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects the 
local source Hadoop Path object to have a scheme specified which the 
YarnClusterDescriptor uploading the local files won't have.

When uploading files to S3 CopyFromLocalOperation#getFinalPath compares the 
passed source Hadoop Path with the file it found(which will have file:// 
scheme) using URI.relativize but it will fail because of the scheme difference 
and throw PathIOException as can be seen in this exception:

 
{code:java}
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn Application Cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]        ..
Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for 
URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
 Input/output error
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[hadoop-common-3.3.3.jar!/:?]
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
 ~[hadoop-common-3.3.3.jar!/:?]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
... 35 more {code}
 

The possibly easiest solution would be to somehow add the file:// scheme in 
YarnApplicationFileUploader#copyToRemoteApplicationDir

The other solution would be to change all calls uploading local files to use 
"new Path(file.toURI())" instead of "new Path(file.getAbsolutePath())" but it 
might not be as future-proof as the other solution

Email thread: [https://lists.apache.org/thread/oo5rlyo3jr7kds2y6wwnfo1yhnk0fx4c]

 

If a committer can assign this ticket to me I can start working on this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] Conventions on driving FLIPs towards consensus

2023-01-28 Thread Dong Lin
Thanks Xintong for initiating and sharing the discussion!

The agreement described in the summary looks pretty good. In particular, it
is great to know that ASF already has guidance requiring "voter must
provide with the veto a technical justification showing why the change is
bad". This is exactly what we need. This is an important step towards
making our FLIP discussion process more collaborative and productive :)

While I agree it is important to work on trust, it is probably still useful
to document the agreement described above in the wiki, so that new Flink
contributors can follow this guidance easily rather than having to read
this email thread. For example, we can probably have something like this

guideline
that specifies what MUST be done and what SHOULD (preferred but not
required) be done to merge PR. The use of MUST vs. SHOULD could help us
achieve a balance between trust and rules.

I guess the guidance can include something like this:
- Voter *must* provide with the veto a technical justification showing why
the change is bad.
- The discussion thread *must* last for at least 5 days (excluding weekends
and national holidays).
- The voting thread *must* last for at least 3 days (excluding weekends and
national holidays).
- The discussion thread *should* allow 5 days for the reviewer to reply.

I am wondering what other contributors think regarding putting the
agreement in the wiki.

Cheers,
Dong


On Sat, Jan 28, 2023 at 12:26 PM Xintong Song  wrote:

> Hi devs,
>
> Recently, a discussion about how to drive FLIPs towards consensus has taken
> place on the private@ mailing list. While many PMC members have already
> shared their opinions, we believe for this topic the opinions of other
> committers / contributors are equally important. Therefore, we are moving
> the discussion to the dev@ mailing list for a wilder involvement.
>
> ### Background
>
> Flink Improvement Proposal (FLIP) [1], the process for proposing,
> discussing, reviewing and voting on major changes to Flink, plays an
> important role in driving the project forward. According to the process,
> *consensus*[2] is required for any proposal to be accepted. This means
> objections from any single committer can block the process. It has been
> observed many times that a FLIP is long blocked on a disapproving
> committer. In most cases, this is necessary for addressing technical
> concerns. However, there are also cases that after raising concerns the
> committer becomes unresponsive (due to other works, personal vacation
> plans, etc.), leaving the FLIP blocked for an unnecessarily long time.
>
> The purpose of this discussion is to come up with some conventions on
> preventing FLIPs from being long blocked on unresponsive reviewers.
>
> ### Summary of the previous discussion on private@
>
>- Most people agree that the progress of a FLIP should not be long
>blocked on an unresponsive reviewer. When a reviewer blocks the
> progress of
>a FLIP, he/she should be responsive to the subsequent replies, or at
> least
>provide a reasonable estimated time of response.
>- As for how long we should wait for the responses, there’s a tendency
>towards relying on the judgement of individuals while also having a
>typically recommended time (1 week).
>- Committers should use their veto rights with care. Per the ASF policy
>[3], vetos must be provided with a technical justification showing why
> the
>change is bad. They should not be used for simply blocking the process
> so
>the voter has more time to catch up.
>- We’d also encourage the FLIP proposers to actively reach out to the
>interested parties (e.g., previous contributors of the relevant part)
>early. It helps expose and address the potential concerns early and also
>leaves more time for other parties to respond while the proposer works
> on
>the FLIP.
>- We’d like to work on trust rather than heavy rules and processes. All
>above, if agreed, should be conventions among the community. We would
> not
>formally change the FLIP process.
>
>
> Looking forward to your opinions.
>
>
> Best,
>
> Xintong
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Approvals
>
> [3] https://www.apache.org/foundation/voting.html#Veto
>


Re: [DISCUSS] FLIP-274 : Introduce metric group for OperatorCoordinator

2023-01-28 Thread Hang Ruan
Hi all,

Thanks for all the comments. Sorry for my late reply.

I think I understand most of the suggestions. I still have the same
question about the adding task id/name like Jark.
And I have refactored the POC like this (
https://github.com/ruanhang1993/flink/commit/321192d17afbb9d00285a78a121676ae5e371292),
whose scope has the task name. When we initialize the coordinator, it is
hard to get the task name like we do in the TaskMetricGroup.
I also found a weird thing
that ExecutionJobVertex#getTaskInformationOrBlobKey will use
the jobVertex.getName() as the task name.
But ExecutionJobVertex#createOperatorCoordinatorHolder also will use
the jobVertex.getName() as the operatorName
for LazyInitializedCoordinatorContext. Maybe the task name will
be duplicated to the operator name in the metric name.

Best,
Hang

Jark Wu  于2023年1月28日周六 16:15写道:

> Hi all,
>
> IIUC, Chesnay means we should have a more general metric group for
> **operator**
> not for **coordinator** in JM, this would be useful to extend
> other operator-specific
> metrics in the future. That means the new scope format should be designed
> for
> the operator,
> e.g., metrics.scope.jm-operator=.jobmanager..
> The coordinator metric is a subgroup (a constant "coordinator" suffix) of
> the JMOperatorMG.
>
> I think this is a nice design. However, I have a question about adding
> task id/name to this list.
> How to get the task id/name when reporting a coordinator metric? As we
> know, coordinator metrics
> are just like JMJobMetricGroup, which don't belong to any tasks/vertexes.
> Do you mean reporting
> coordinator metrics for every task id under the operator?
>
> Best,
> Jark
>
>
> On Thu, 19 Jan 2023 at 17:33, Chesnay Schepler  wrote:
>
>>  > First, I do not understand why users have to configure the new scope
>> format, which has a default value.
>>
>> If you don't use scope formats, sure. If you do use scope formats, e.g.
>> to add a common prefix (which is the case for datadog users for
>> example), then the current default in the FLIP is insufficient and
>> requires the user to update the configuration.
>>
>>  > I usually do not need to change these configuration of scope formats
>> when submitting the flink job.
>>
>> Scope formats as a whole are quite a power-user feature, but that
>> doesn't mean we should ignore it.
>>
>>  > I try to let it extend the ComponentMetricGroup
>>
>> This isn't inherently required just because it is a "component".
>> Component metric groups should _only_ be used for cases where such a
>> component provides several bits of metadata.
>> For example, tasks provide vertex ids, task names, attempted IDs etc.,
>> and we'd like users to have the option on how this metadata ends up
>> being used (via scope formats). This currently can't be built with the
>> addGroup() methods, hence why the component groups exist.
>> However, the operator coordinator _itself_does not provide several bits
>> of metadata. Logically, the _operator_ does, not the coordinator.
>>
>>  > This make me decide to add a new scope format.
>>
>> And this is fine; just don't add a new scope format for the coordinator
>> but logically for operators on the JM side, such that we can extend the
>> set of operator-specific metrics in the future without having to add yet
>> another scope format.
>>
>>  > The relationship between the Operator and OperatorCoordinator is
>> maintained in the OperatorCoordinatorHolder. In the POC, the operator
>> name/id could be found and the OperatorCoordinatorMetricGroup will be
>> created here.
>>
>> That's all irrelevant.
>>
>>  > The registered metrics of OperatorCoordinator always are those which
>> can not be aggregated from the tasks, like the number of unassigned
>> splits in the SourceCoordinator. Actually we have not encountered the
>> scenario you mentioned. I think the OperatorCoordinatorMetricGroup
>> should only contain the metrics for itself instead of its subtasks.
>>
>> You are misunderstanding the problem.
>>
>> This isn't about aggregating metrics on our side or exposing metrics
>> from TMs on the JM side or anything like that.
>>
>> It's purely about the available metadata for operator metrics on the JM
>> side. Currently you suggest operator name / id; what I'm proposing is to
>> add task(==vertex!) id / name to this list.
>>
>> The benefit here is simple. I can look up all metrics where task_id==XXX
>> and get _everything_ related to that vertex, including all metrics
>> associated with operators that are part of that vertex, including the
>> coordinator.
>>
>>  > Using metrics.scope.jm.job is not enough to distinguish the different
>> coordinators.
>>
>> I did not suggest just using metrics.scope.jm.job. I suggested that as
>> the default for the jm operator scope format, that is used for the
>> proposed JobManagerOperatorMetricGroup, which will have some plain
>> metric groups as children for coordinators.
>> Aka, you have the JMOperatorMG, then you call addGroup("coordinators")
>> and then 

Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-01-28 Thread Shammon FY
Hi @John

Thanks for your feedback. I'd like to share my thoughts about your
questions and discuss them with you

> 10. Have you considered proposing a general consistency mechanism instead
of restricting it to TableStore+ETL graphs? For example, it seems to me to
be possible and valuable to define instead the contract that sources/sinks
need to implement in order to participate in globally consistent snapshots.

A general consistency mechanism is cool! In my mind, the overall
`consistency system` consists of three components: Streaming & Batch ETL,
Streaming & Batch Storage and MetaService. MetaService is decoupled from
Storage Layer, but it stores consistency information in persistent storage.
It can be started as an independent node or a component in a large Flink
cluster. In the FLIP we use TableStore as the Storage Layer. As you
mentioned, we plan to implement specific source and sink on the TableStore
in the first phase, and may consider other storage in the future

> 11a. Engineering may operate one Flink cluster, and some other org, like
Finance may operate another. In most cases, those are separate domains that
don't typically get mixed together in jobs, but some people, like the CEO,
would still benefit from being able to make a consistent query that spans
arbitrary contexts within the business. How well can a feature like this
transcend a single Flink infrastructure? Does it make sense to consider a
model in which snapshots from different domains can be composable?

As mentioned above, MetaService is an independent node, it supports data
consistency of multiple independent Flink clusters for OLAP if:
a) Timestamp Barrier flows between multiple Flink clusters through
different storage (Kafka, Table Store, and etc), b) ETL jobs in Flink
clusters report source/sink tables, snapshots in tables and processing
status of Timestamp Barrier to MetaService.

> 11b. Some groups may have a relatively stable set of long-running jobs,
while others (like data science, skunkworks, etc) may adopt a more
experimental, iterative approach with lots of jobs entering and exiting the
ecosystem over time. It's still valuable to have them participate in the
consistency model, but it seems like the consistency system will have to
deal with more chaos than I see in the design. For example, how can this
feature tolerate things like zombie jobs (which are registered in the
system, but fail to check in for a long time, and then come back later).

Management of consistency information is an important issue. We hope to
create new FLIPs for specific issues and modules after reaching an
agreement on the overall design of this FLIP. In addition to short-running
jobs and zombie jobs, there may also be data revisions and more other
scenarios, I'll add them to the FLIP. If you are interested, very welcome
to give more inputs and work with us to implement the design and
implementation in the later FLIPs.

> 12. I didn't see any statements about patterns like cycles in the ETL
Topology. I'm aware that there are fundamental constraints on how well
cyclic topologies can be supported by a distributed snapshot algorithm.
However, there are a range of approaches/compromises that we can apply to
cyclic topologies. At the very least, we can state that we will detect
cycles and produce a warning, etc.

The current FLIP does not support multiple jobs to write the same table,
ETL topology has cycles, etc. I'll add these constraints to FLIP. As you
mentioned, we can consider supporting cycles through some approaches, such
as Operator or Sink which can update the Timestamp Barrier.

> 13. I'm not sure how heavily you're waiting the query syntax part of the
proposal, so please feel free to defer this point. It looked to me like the
proposal assumes people want to query either the latest consistent snapshot
or the latest inconsistent state. However, it seems like there's a
significant opportunity to maintain a manifest of historical snapshots and
allow people to query as of old points in time. That can be valuable for
individuals answering data questions, building products, and crucially
supporting auditability use cases. To that latter point, it seems nice to
provide not only a mechanism to query arbitrary snapshots, but also to
define a TTL/GC model that allows users to keep hourly snapshots for N
hours, daily snapshots for N days, weekly snapshots for N weeks, and the
same for monthly, quarterly, and yearly snapshots.

I quite agree with you. Managing data versions through MetaService and
understanding global data information can help us better manage data in
different storages, including TTL/GC, Comparison, etc. This can not only
guarantee data security, avoid deleting data that is being used, but also
better save storage space.


Best,
Shammon


On Sat, Jan 28, 2023 at 12:46 AM John Roesler  wrote:

> Hello Shammon and all,
>
> Thanks for this FLIP! I've been working toward this kind of global
> consistency across large scale data 

Re: [DISCUSS] FLIP-274 : Introduce metric group for OperatorCoordinator

2023-01-28 Thread Jark Wu
Hi all,

IIUC, Chesnay means we should have a more general metric group for
**operator**
not for **coordinator** in JM, this would be useful to extend
other operator-specific
metrics in the future. That means the new scope format should be designed
for
the operator,
e.g., metrics.scope.jm-operator=.jobmanager..
The coordinator metric is a subgroup (a constant "coordinator" suffix) of
the JMOperatorMG.

I think this is a nice design. However, I have a question about adding task
id/name to this list.
How to get the task id/name when reporting a coordinator metric? As we
know, coordinator metrics
are just like JMJobMetricGroup, which don't belong to any tasks/vertexes.
Do you mean reporting
coordinator metrics for every task id under the operator?

Best,
Jark


On Thu, 19 Jan 2023 at 17:33, Chesnay Schepler  wrote:

>  > First, I do not understand why users have to configure the new scope
> format, which has a default value.
>
> If you don't use scope formats, sure. If you do use scope formats, e.g.
> to add a common prefix (which is the case for datadog users for
> example), then the current default in the FLIP is insufficient and
> requires the user to update the configuration.
>
>  > I usually do not need to change these configuration of scope formats
> when submitting the flink job.
>
> Scope formats as a whole are quite a power-user feature, but that
> doesn't mean we should ignore it.
>
>  > I try to let it extend the ComponentMetricGroup
>
> This isn't inherently required just because it is a "component".
> Component metric groups should _only_ be used for cases where such a
> component provides several bits of metadata.
> For example, tasks provide vertex ids, task names, attempted IDs etc.,
> and we'd like users to have the option on how this metadata ends up
> being used (via scope formats). This currently can't be built with the
> addGroup() methods, hence why the component groups exist.
> However, the operator coordinator _itself_does not provide several bits
> of metadata. Logically, the _operator_ does, not the coordinator.
>
>  > This make me decide to add a new scope format.
>
> And this is fine; just don't add a new scope format for the coordinator
> but logically for operators on the JM side, such that we can extend the
> set of operator-specific metrics in the future without having to add yet
> another scope format.
>
>  > The relationship between the Operator and OperatorCoordinator is
> maintained in the OperatorCoordinatorHolder. In the POC, the operator
> name/id could be found and the OperatorCoordinatorMetricGroup will be
> created here.
>
> That's all irrelevant.
>
>  > The registered metrics of OperatorCoordinator always are those which
> can not be aggregated from the tasks, like the number of unassigned
> splits in the SourceCoordinator. Actually we have not encountered the
> scenario you mentioned. I think the OperatorCoordinatorMetricGroup
> should only contain the metrics for itself instead of its subtasks.
>
> You are misunderstanding the problem.
>
> This isn't about aggregating metrics on our side or exposing metrics
> from TMs on the JM side or anything like that.
>
> It's purely about the available metadata for operator metrics on the JM
> side. Currently you suggest operator name / id; what I'm proposing is to
> add task(==vertex!) id / name to this list.
>
> The benefit here is simple. I can look up all metrics where task_id==XXX
> and get _everything_ related to that vertex, including all metrics
> associated with operators that are part of that vertex, including the
> coordinator.
>
>  > Using metrics.scope.jm.job is not enough to distinguish the different
> coordinators.
>
> I did not suggest just using metrics.scope.jm.job. I suggested that as
> the default for the jm operator scope format, that is used for the
> proposed JobManagerOperatorMetricGroup, which will have some plain
> metric groups as children for coordinators.
> Aka, you have the JMOperatorMG, then you call addGroup("coordinators")
> and then addGroup(coordinator_name) for each coordinator.
>
>  > So there are two choice for the
> InternalOperatorCoordinatorMetricGroup: 1. add and improve the new scope
> format; 2. use the metrics.scope.jm.job and ProxyMetricGroup. Which one
> is better?
>
> There are more options than that.
> I feel like there are a lot of misunderstandings in this discussion.
> Please let me know if I could clear things up. If not I can also provide
> a PoC based on your PoC if that can speed things up. It's not that
> different anyway.
>
> On 19/01/2023 07:39, Hang Ruan wrote:
> > Hi, chesnay,
> >
> > Thanks for your reply. I still have some doubts about the questions
> > you raised.
> >
> > > Extending the set of ScopeFormats is problematic because it in
> > practice
> > it breaks the config if users actively rely on it, since there's now
> > another key that they _must_ set for it to be
> > consistent/compatible with
> > their existing setup.
> > 

[jira] [Created] (FLINK-30811) Fix sql gateway can not stop job correctly

2023-01-28 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-30811:
-

 Summary: Fix sql gateway can not stop job correctly
 Key: FLINK-30811
 URL: https://issues.apache.org/jira/browse/FLINK-30811
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Gateway
Affects Versions: 1.17
Reporter: Shengkai Fang
 Fix For: 1.17






--
This message was sent by Atlassian Jira
(v8.20.10#820010)