[jira] [Created] (FLINK-22939) Generalize JDK switch in azure setup

2021-06-08 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-22939:


 Summary: Generalize JDK switch in azure setup
 Key: FLINK-22939
 URL: https://issues.apache.org/jira/browse/FLINK-22939
 Project: Flink
  Issue Type: Improvement
  Components: Build System / Azure Pipelines
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.14.0


Our current azure setup makes it a bit difficult to switch to a different JDK 
because the "jdk" parameter is only evaluated if it is set to "jdk11".

Instead, we could generalize this a bit so that it is always evaluated, such 
that if the image contains the JDK (under some expected location) one can just 
specify {{jdk: 14}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Definition of idle partitions

2021-06-08 Thread Piotr Nowojski
Hi Eron,

Can you elaborate a bit more what do you mean? I don’t understand what do you 
mean by more general solution. 

As of now, stream is marked idle by a source/watermark generator, which has an 
effect of temporarily ignoring this stream/partition from calculating min 
watermark in the downstream tasks. However stream is switching back to active 
when any record is emitted. This is what’s causing problems described by Arvid. 

The core of our proposal is very simple. Keep everything as it is except 
stating that stream will be changed back to active only once a watermark is 
emitted again - not record. In other words disconnecting idleness from presence 
of records and connecting it only to presence or lack of watermarks and 
allowing to emit records while “stream status” is “idle”

Piotrek


> Wiadomość napisana przez Eron Wright  w dniu 
> 09.06.2021, o godz. 06:01:
> 
> It seems to me that idleness was introduced to deal with a very specific
> issue.  In the pipeline, watermarks are aggregated not on a per-split basis
> but on a per-subtask basis.  This works well when each subtask has exactly
> one split.  When a sub-task has multiple splits, various complications
> occur involving the commingling of watermarks.  And when a sub-task has no
> splits, the pipeline stalls altogether.  To deal with the latter problem,
> idleness was introduced.  The sub-task simply declares itself to be idle to
> be taken out of consideration for purposes of watermark aggregation.
> 
> If we're looking for a more general solution, I would suggest we discuss
> how to track watermarks on a per-split basis.  Or, as Till mentioned
> recently, an alternate solution may be to dynamically adjust the
> parallelism of the task.
> 
> I don't agree with the notion that idleness involves a correctness
> tradeoff.  The facility I described above has no impact on correctness.
> Meanwhile, various watermark strategies rely on heuristics involving the
> processing-time domain, and the term idleness seems to have found purchase
> there too.  The connection among the concepts seems tenuous.
> 
> -E
> 
> 
> 
>> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski  wrote:
>> 
>> Hi Arvid,
>> 
>> Thanks for writing down this summary and proposal. I think this was the
>> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
>> that idleness is intermittent, strictly a task local concept and as such
>> shouldn't be exposed in for example sinks. While me and Eron thought that
>> it's a concept strictly connected to watermarks.
>> 
>> 1. I'm big +1 for changing the StreamStatus definition to stream "providing
>> watermark" and "not providing watermark". With respect to that I agree with
>> Dawid that record bound idleness *(if we would ever need to define/expose
>> it)* should be an intermittent concept, like for example the existing in
>> the Task/runtime input availability (StreamTaskInput#isAvailable).
>> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
>> I also don't have any good ideas.
>> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>> 
>> Best,
>> Piotrek
>> 
>> wt., 8 cze 2021 o 16:35 Arvid Heise  napisał(a):
>> 
>>> Hi devs,
>>> 
>>> While discussing "Watermark propagation with Sink API" and during
>>> "[FLINK-18934] Idle stream does not advance watermark in connected
>> stream",
>>> we noticed some drawbacks on how Flink defines idle partitions currently.
>>> 
>>> To recap, idleness was always considered as a means to achieve progress
>> in
>>> window operators with idle partition in the source at the risk of losing
>> a
>>> bit of correctness. In particular, records could be considered late,
>> simply
>>> because of that idleness timeout and not because they arrived out of
>> order.
>>> A potential reprocessing would not be causing these records to be
>>> considered late and we may end up with a different (correct) result.
>>> 
>>> The drawbacks that we discovered are as follows:
>>> - We currently only use idleness to exclude respective upstream tasks
>> from
>>> participating in watermark generation.
>>> - However, the definition is bound to records. [1] In particular, while a
>>> partition is idle, no records should be produced.
>>> - That brings us into quite a few edge cases, where operators emit
>> records,
>>> while they are actually idling: Think of timers, asyncIO operators,
>> window
>>> operators based on timeouts, etc. that trigger on an operator ingesting
>> an
>>> idle partition.
>>> - The proper solution would be to turn the operator active while emitting
>>> and to return to being idle afterwards (but when?). However, this has
>> some
>>> unintended side-effects depending on when you switch back:
>>>  - If you toggle stream status for each record, you get a huge overhead
>> on
>>> stream status records and quite a bit of processing in downstream
>> operators
>>> (that code path is not much optimized since switching is considered a
>> rare
>>> thing).
>>>  - If you 

[jira] [Created] (FLINK-22938) Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout

2021-06-08 Thread Bhagi (Jira)
Bhagi created FLINK-22938:
-

 Summary: Slot request bulk is not fulfillable! Could not allocate 
the required slot within slot request timeout
 Key: FLINK-22938
 URL: https://issues.apache.org/jira/browse/FLINK-22938
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.12.4
Reporter: Bhagi


Hi team,

I tested cluster upgrade from Flink Version 1.12.4 to 1.13.1 ,due to 1 job 
issues latest version cluster went into crashloopbackoff with error. hence i 
degraded to old cluster version. from latest upgraded version 1.13.1 to  1.12.4 
it was successful. But all job executions are failed state.

with following error."Slot request bulk is not fulfillable! Could not allocate 
the required slot within slot request ".please find the log.

# FLink config file ##
flink@flink-jobmanager-657cb5d847-5b579:~$ cat conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
jobmanager.rpc.address: flink-jobmanager
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
rest.connection-timeout: 25000
web.log.path: /opt/flink/log/output.log
taskmanager.log.path: /opt/flink/log/output.log
state.backend: rocksdb
state.checkpoints.dir: file:///persistent/flinkData/checkpoints
state.backend.rocksdb.log.dir: /persistent/flinkData/rocksdb/logging/
state.savepoints.dir:  file:///persistent/flinkData/savepoints
state.backend.incremental: true
state.checkpoints.num-retained: 1
web.upload.dir: /persistent/flinkData
classloader.resolve-order: parent-first
kubernetes.cluster-id: 222
kubernetes.namespace: flink-mcd
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///persistent/flinkData/checkpoints
jobmanager.archive.fs.dir: file:///persistent/flinkData/completed-jobs
historyserver.archive.fs.refresh-interval: 1
historyserver.archive.fs.dir: file:///persistent/flinkData/completed-jobs
metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9090
akka.framesize: 10485760b
flink@flink-jobmanager-657cb5d847-5b579:~$




Logs from job manager##
flink@flink-jobmanager-6d644dc78b-6r627:~$ ./bin/flink run 
./examples/streaming/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/opt/flink/lib/flink-dist_2.11-1.12.2.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-06-09 04:59:40,098 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] 
- Stopping 
KubernetesLeaderRetrievalDriver{configMapName='111-restserver-leader'}.
2021-06-09 04:59:40,099 INFO  
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] 
- The watcher is closing.
Job has been submitted with JobID d36a0b99601dc6af696d213da2f8c159
2021-06-09 05:04:37,877 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] 
- Stopping 
KubernetesLeaderRetrievalDriver{configMapName='111-restserver-leader'}.
2021-06-09 05:04:37,879 INFO  
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher [] 
- The watcher is closing.


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: d36a0b99601dc6af696d213da2f8c159)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 

[jira] [Created] (FLINK-22937) rocksdb cause jvm to crash

2021-06-08 Thread Piers (Jira)
Piers created FLINK-22937:
-

 Summary: rocksdb cause jvm to crash
 Key: FLINK-22937
 URL: https://issues.apache.org/jira/browse/FLINK-22937
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.13.1
 Environment: deployment: native kubernates
Reporter: Piers
 Attachments: hs_err_pid1.log

JVM crash when running job. Possibly RocksDB caused this.

This link containers JVM crash log.

Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Definition of idle partitions

2021-06-08 Thread Eron Wright
It seems to me that idleness was introduced to deal with a very specific
issue.  In the pipeline, watermarks are aggregated not on a per-split basis
but on a per-subtask basis.  This works well when each subtask has exactly
one split.  When a sub-task has multiple splits, various complications
occur involving the commingling of watermarks.  And when a sub-task has no
splits, the pipeline stalls altogether.  To deal with the latter problem,
idleness was introduced.  The sub-task simply declares itself to be idle to
be taken out of consideration for purposes of watermark aggregation.

If we're looking for a more general solution, I would suggest we discuss
how to track watermarks on a per-split basis.  Or, as Till mentioned
recently, an alternate solution may be to dynamically adjust the
parallelism of the task.

I don't agree with the notion that idleness involves a correctness
tradeoff.  The facility I described above has no impact on correctness.
Meanwhile, various watermark strategies rely on heuristics involving the
processing-time domain, and the term idleness seems to have found purchase
there too.  The connection among the concepts seems tenuous.

-E



On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski  wrote:

> Hi Arvid,
>
> Thanks for writing down this summary and proposal. I think this was the
> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> that idleness is intermittent, strictly a task local concept and as such
> shouldn't be exposed in for example sinks. While me and Eron thought that
> it's a concept strictly connected to watermarks.
>
> 1. I'm big +1 for changing the StreamStatus definition to stream "providing
> watermark" and "not providing watermark". With respect to that I agree with
> Dawid that record bound idleness *(if we would ever need to define/expose
> it)* should be an intermittent concept, like for example the existing in
> the Task/runtime input availability (StreamTaskInput#isAvailable).
> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> I also don't have any good ideas.
> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>
> Best,
> Piotrek
>
> wt., 8 cze 2021 o 16:35 Arvid Heise  napisał(a):
>
> > Hi devs,
> >
> > While discussing "Watermark propagation with Sink API" and during
> > "[FLINK-18934] Idle stream does not advance watermark in connected
> stream",
> > we noticed some drawbacks on how Flink defines idle partitions currently.
> >
> > To recap, idleness was always considered as a means to achieve progress
> in
> > window operators with idle partition in the source at the risk of losing
> a
> > bit of correctness. In particular, records could be considered late,
> simply
> > because of that idleness timeout and not because they arrived out of
> order.
> > A potential reprocessing would not be causing these records to be
> > considered late and we may end up with a different (correct) result.
> >
> > The drawbacks that we discovered are as follows:
> > - We currently only use idleness to exclude respective upstream tasks
> from
> > participating in watermark generation.
> > - However, the definition is bound to records. [1] In particular, while a
> > partition is idle, no records should be produced.
> > - That brings us into quite a few edge cases, where operators emit
> records,
> > while they are actually idling: Think of timers, asyncIO operators,
> window
> > operators based on timeouts, etc. that trigger on an operator ingesting
> an
> > idle partition.
> > - The proper solution would be to turn the operator active while emitting
> > and to return to being idle afterwards (but when?). However, this has
> some
> > unintended side-effects depending on when you switch back:
> >   - If you toggle stream status for each record, you get a huge overhead
> on
> > stream status records and quite a bit of processing in downstream
> operators
> > (that code path is not much optimized since switching is considered a
> rare
> > thing).
> >   - If you toggle after a certain time, you may get delays>idleness in
> the
> > downstream window operators.
> >   - You could turn back when you processed all pending mails, but if you
> > have a self-replicating mail that would be never. Self-enqueueing, low
> > timer would also produce a flood similar to the first case.
> >
> > All in all, the situation is quite unsatisfying because idleness implies
> no
> > records. However, currently there is no need to have that implication:
> > since we only use it for watermarks, we can easily allow records to be
> > emitted (in fact that was the old behavior before FLINK-18934 in many
> > cases) and still get the intended behavior in respect to watermarks:
> > - A channel that is active is providing watermarks.
> > - An idle channel is not providing any watermarks but can deliver
> records.
> >
> > Ultimately, that would mean that we are actually not talking idle/active
> > partitions anymore. We are talking more about whether a particular
> 

Re: Add control mode for flink

2021-06-08 Thread Xintong Song
>
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>

TBH, I don't really know yet. We feel that the control flow is a
non-trivial topic and it would be better to bring it up publicly as early
as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and
checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature
that takes time to stabilize. It would be better that the existing
important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages
somehow rely on it to achieve exactly once consistency. Without the
concrete design, I'm not entirely sure whether it can be properly modeled
as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However,
it's already exposed to users as public APIs. If we want to migrate it to
the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu  wrote:

> > producing control events from JobMaster is similar to triggering a
> savepoint.
>
> Paul, here is what I see the difference. Upon job or jobmanager recovery,
> we don't need to recover and replay the savepoint trigger signal.
>
> On Tue, Jun 8, 2021 at 8:20 PM Paul Lam  wrote:
>
>> +1 for this feature. Setting up a separate control stream is too much for
>> many use cases, it would very helpful if users can leverage the built-in
>> control flow of Flink.
>>
>> My 2 cents:
>> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> triggering a savepoint. The REST api is non-blocking, and users should poll
>> the results to confirm the operation is succeeded. If something goes wrong,
>> it’s user’s responsibility to retry.
>> 2. There are two kinds of existing special elements, special stream
>> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> flow through the whole DAG, but events needs to be acknowledged by
>> downstream and can overtake records, while stream records are not). So I’m
>> wondering if we plan to unify the two approaches in the new control flow
>> (as Xintong mentioned both in the previous mails)?
>>
>> Best,
>> Paul Lam
>>
>> 2021年6月8日 14:08,Steven Wu  写道:
>>
>>
>> I can see the benefits of control flow. E.g., it might help the old (and
>> inactive) FLIP-17 side input. I would suggest that we add more details of
>> some of the potential use cases.
>>
>> Here is one mismatch with using control flow for dynamic config. Dynamic
>> config is typically targeted/loaded by one specific operator. Control flow
>> will propagate the dynamic config to all operators. not a problem per se
>>
>> Regarding using the REST api (to jobmanager) for accepting control
>> signals from external system, where are we going to persist/checkpoint the
>> signal? jobmanager can die before the control signal is propagated and
>> checkpointed. Did we lose the control signal in this case?
>>
>>
>> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song 
>> wrote:
>>
>>> +1 on separating the effort into two steps:
>>>
>>>1. Introduce a common control flow framework, with flexible
>>>interfaces for generating / reacting to control messages for various
>>>purposes.
>>>2. Features that leverating the control flow can be worked on
>>>concurrently
>>>
>>> Meantime, keeping collecting potential features that may leverage the
>>> control flow should be helpful. It provides good inputs for the control
>>> flow framework design, to make the framework common enough to cover the
>>> potential use cases.
>>>
>>> My suggestions on the next steps:
>>>
>>>1. Allow more time for opinions to be heard and potential use cases
>>>to be collected
>>>2. Draft a FLIP with the scope of common control flow framework
>>>3. We probably need a poc implementation to make sure the framework
>>>covers at least the following scenarios
>>>   1. Produce control events from arbitrary operators
>>>   2. Produce control events from JobMaster
>>>   3. Consume control events from arbitrary operators downstream
>>>   where the events are produced
>>>
>>>
>>> Thank you~
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>>>
 Very thanks Jiangang for bringing this up and very thanks for the
 discussion!

 I also agree with the summarization by Xintong and Jing that control
 flow seems to be
 a common buidling block for many functionalities 

Re: Add control mode for flink

2021-06-08 Thread Steven Wu
> producing control events from JobMaster is similar to triggering a
savepoint.

Paul, here is what I see the difference. Upon job or jobmanager recovery,
we don't need to recover and replay the savepoint trigger signal.

On Tue, Jun 8, 2021 at 8:20 PM Paul Lam  wrote:

> +1 for this feature. Setting up a separate control stream is too much for
> many use cases, it would very helpful if users can leverage the built-in
> control flow of Flink.
>
> My 2 cents:
> 1. @Steven IMHO, producing control events from JobMaster is similar to
> triggering a savepoint. The REST api is non-blocking, and users should poll
> the results to confirm the operation is succeeded. If something goes wrong,
> it’s user’s responsibility to retry.
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>
> Best,
> Paul Lam
>
> 2021年6月8日 14:08,Steven Wu  写道:
>
>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song 
> wrote:
>
>> +1 on separating the effort into two steps:
>>
>>1. Introduce a common control flow framework, with flexible
>>interfaces for generating / reacting to control messages for various
>>purposes.
>>2. Features that leverating the control flow can be worked on
>>concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>1. Allow more time for opinions to be heard and potential use cases
>>to be collected
>>2. Draft a FLIP with the scope of common control flow framework
>>3. We probably need a poc implementation to make sure the framework
>>covers at least the following scenarios
>>   1. Produce control events from arbitrary operators
>>   2. Produce control events from JobMaster
>>   3. Consume control events from arbitrary operators downstream
>>   where the events are produced
>>
>>
>> Thank you~
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> --
>>> Sender:kai wang
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG
>>> Cc:刘建刚; Xintong Song [via Apache Flink User
>>> Mailing List archive.]; user<
>>> u...@flink.apache.org>; dev
>>> Theme:Re: Add control mode for flink
>>>
>>>
>>>
>>> I'm big +1 for this feature.
>>>
>>>1. Limit the input qps.
>>>2. Change log level for debug.
>>>
>>> in my team, the two examples above are needed
>>>
>>> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>>>
 Thanks Jiangang for bringing this up.
 As mentioned in Jiangang's email, `dynamic configuration framework`
 provides many useful functions in Kuaishou, because it could update job
 behavior without relaunching the job. The functions are 

Re: Add control mode for flink

2021-06-08 Thread Paul Lam
+1 for this feature. Setting up a separate control stream is too much for many 
use cases, it would very helpful if users can leverage the built-in control 
flow of Flink.

My 2 cents:
1. @Steven IMHO, producing control events from JobMaster is similar to 
triggering a savepoint. The REST api is non-blocking, and users should poll the 
results to confirm the operation is succeeded. If something goes wrong, it’s 
user’s responsibility to retry.
2. There are two kinds of existing special elements, special stream records 
(e.g. watermarks) and events (e.g. checkpoint barrier). They all flow through 
the whole DAG, but events needs to be acknowledged by downstream and can 
overtake records, while stream records are not). So I’m wondering if we plan to 
unify the two approaches in the new control flow (as Xintong mentioned both in 
the previous mails)?

Best,
Paul Lam

> 2021年6月8日 14:08,Steven Wu  写道:
> 
> 
> I can see the benefits of control flow. E.g., it might help the old (and 
> inactive) FLIP-17 side input. I would suggest that we add more details of 
> some of the potential use cases.
> 
> Here is one mismatch with using control flow for dynamic config. Dynamic 
> config is typically targeted/loaded by one specific operator. Control flow 
> will propagate the dynamic config to all operators. not a problem per se 
> 
> Regarding using the REST api (to jobmanager) for accepting control signals 
> from external system, where are we going to persist/checkpoint the signal? 
> jobmanager can die before the control signal is propagated and checkpointed. 
> Did we lose the control signal in this case?
> 
> 
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song  > wrote:
> +1 on separating the effort into two steps:
> Introduce a common control flow framework, with flexible interfaces for 
> generating / reacting to control messages for various purposes.
> Features that leverating the control flow can be worked on concurrently
> Meantime, keeping collecting potential features that may leverage the control 
> flow should be helpful. It provides good inputs for the control flow 
> framework design, to make the framework common enough to cover the potential 
> use cases.
> 
> My suggestions on the next steps:
> Allow more time for opinions to be heard and potential use cases to be 
> collected
> Draft a FLIP with the scope of common control flow framework
> We probably need a poc implementation to make sure the framework covers at 
> least the following scenarios
> Produce control events from arbitrary operators
> Produce control events from JobMaster
> Consume control events from arbitrary operators downstream where the events 
> are produced
> 
> Thank you~
> Xintong Song
> 
> 
> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  > wrote:
> Very thanks Jiangang for bringing this up and very thanks for the discussion! 
> 
> I also agree with the summarization by Xintong and Jing that control flow 
> seems to be
> a common buidling block for many functionalities and dynamic configuration 
> framework
> is a representative application that frequently required by users. Regarding 
> the control flow, 
> currently we are also considering the design of iteration for the flink-ml, 
> and as Xintong has pointed
> out, it also required the control flow in cases like detection global 
> termination inside the iteration
>  (in this case we need to broadcast an event through the iteration body to 
> detect if there are still 
> records reside in the iteration body). And regarding  whether to implement 
> the dynamic configuration 
> framework, I also agree with Xintong that the consistency guarantee would be 
> a point to consider, we 
> might consider if we need to ensure every operator could receive the dynamic 
> configuration. 
> 
> Best,
> Yun
> 
> 
> 
> --
> Sender:kai wangmailto:yiduwang...@gmail.com>>
> Date:2021/06/08 11:52:12
> Recipient:JING ZHANGmailto:beyond1...@gmail.com>>
> Cc:刘建刚mailto:liujiangangp...@gmail.com>>; Xintong 
> Song [via Apache Flink User Mailing List 
> archive.] >; user >; dev >
> Theme:Re: Add control mode for flink
> 
> 
> 
> I'm big +1 for this feature. 
> Limit the input qps.
> Change log level for debug.
> in my team, the two examples above are needed
> 
> JING ZHANG mailto:beyond1...@gmail.com>> 于2021年6月8日周二 
> 上午11:18写道:
> Thanks Jiangang for bringing this up. 
> As mentioned in Jiangang's email, `dynamic configuration framework` provides 
> many useful functions in Kuaishou, because it could update job behavior 
> without relaunching the job. The functions are very popular in Kuaishou, we 
> also see similar demands in maillist [1].
> 
> I'm big +1 for this feature.
> 
> Thanks Xintong and Yun for deep thoughts about the issue. I like the idea 
> about introducing control mode in 

Re: Re: Add control mode for flink

2021-06-08 Thread Steven Wu
option 2 is probably not feasible, as checkpoint may take a long time or
may fail.

Option 1 might work, although it complicates the job recovery and
checkpoint. After checkpoint completion, we need to clean up those control
signals stored in HA service.

On Tue, Jun 8, 2021 at 1:14 AM 刘建刚  wrote:

> Thanks for the reply. It is a good question. There are multi choices as
> follows:
>
>1. We can persist control signals in HighAvailabilityServices and replay
>them after failover.
>2. Only tell the users that the control signals take effect after they
>are checkpointed.
>
>
> Steven Wu [via Apache Flink User Mailing List archive.] <
> ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道:
>
> >
> > I can see the benefits of control flow. E.g., it might help the old (and
> > inactive) FLIP-17 side input. I would suggest that we add more details of
> > some of the potential use cases.
> >
> > Here is one mismatch with using control flow for dynamic config. Dynamic
> > config is typically targeted/loaded by one specific operator. Control
> flow
> > will propagate the dynamic config to all operators. not a problem per se
> >
> > Regarding using the REST api (to jobmanager) for accepting control
> > signals from external system, where are we going to persist/checkpoint
> the
> > signal? jobmanager can die before the control signal is propagated and
> > checkpointed. Did we lose the control signal in this case?
> >
> >
> > On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]
> > > wrote:
> >
> >> +1 on separating the effort into two steps:
> >>
> >>1. Introduce a common control flow framework, with flexible
> >>interfaces for generating / reacting to control messages for various
> >>purposes.
> >>2. Features that leverating the control flow can be worked on
> >>concurrently
> >>
> >> Meantime, keeping collecting potential features that may leverage the
> >> control flow should be helpful. It provides good inputs for the control
> >> flow framework design, to make the framework common enough to cover the
> >> potential use cases.
> >>
> >> My suggestions on the next steps:
> >>
> >>1. Allow more time for opinions to be heard and potential use cases
> >>to be collected
> >>2. Draft a FLIP with the scope of common control flow framework
> >>3. We probably need a poc implementation to make sure the framework
> >>covers at least the following scenarios
> >>   1. Produce control events from arbitrary operators
> >>   2. Produce control events from JobMaster
> >>   3. Consume control events from arbitrary operators downstream
> >>   where the events are produced
> >>
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]
> >> > wrote:
> >>
> >>> Very thanks Jiangang for bringing this up and very thanks for the
> >>> discussion!
> >>>
> >>> I also agree with the summarization by Xintong and Jing that control
> >>> flow seems to be
> >>> a common buidling block for many functionalities and dynamic
> >>> configuration framework
> >>> is a representative application that frequently required by users.
> >>> Regarding the control flow,
> >>> currently we are also considering the design of iteration for the
> >>> flink-ml, and as Xintong has pointed
> >>> out, it also required the control flow in cases like detection global
> >>> termination inside the iteration
> >>>  (in this case we need to broadcast an event through the iteration body
> >>> to detect if there are still
> >>> records reside in the iteration body). And regarding  whether to
> >>> implement the dynamic configuration
> >>> framework, I also agree with Xintong that the consistency guarantee
> >>> would be a point to consider, we
> >>> might consider if we need to ensure every operator could receive the
> >>> dynamic configuration.
> >>>
> >>> Best,
> >>> Yun
> >>>
> >>>
> >>>
> >>> --
> >>> Sender:kai wang<[hidden email]
> >>> >
> >>> Date:2021/06/08 11:52:12
> >>> Recipient:JING ZHANG<[hidden email]
> >>> >
> >>> Cc:刘建刚<[hidden email]
> >>> >; Xintong Song
> >>> [via Apache Flink User Mailing List archive.]<[hidden email]
> >>> >; user<[hidden
> >>> email] >;
> dev<[hidden
> >>> email] >
> >>> Theme:Re: Add control mode for flink
> >>>
> >>>
> >>>
> >>> I'm big +1 for this feature.
> >>>
> >>>1. Limit the input qps.
> >>>2. Change log level for debug.
> >>>
> >>> in my team, the two examples above are needed
> >>>
> >>> JING ZHANG <[hidden email]
> >>> 

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-08 Thread Yangze Guo
Thanks for the valuable suggestion, Arvid.

1) Yes, we can add a new SlotSharingGroup which includes the name and
its resource. After that, we have two interfaces for configuring the
slot sharing group of an operator:
- #slotSharingGroup(String name)// the resource of it can be
configured through StreamExecutionEnvironment#registerSlotSharingGroup
- #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the resource
And one interface to configure the resource of a SSG:
- StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
We can also define the priority of the above two approaches, e.g. the
resource registering in the StreamExecutionEnvironment will always be
respected when conflict. That would be well documented.

2) Yes, I originally add this interface as a shortcut. It seems
unnecessary now. Will remove it.

3) I don't think we need to expose the ExternalResource. In the
builder of SlotSharingGroup, we can introduce a
#withExternalResource(String name, double value). Also, this interface
needs to be annotated as evolving.

4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
elaborate on the Builder for the SlotSharingGroup.

WDYT?

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:
>
> Hi Yangze,
>
> I like the general approach to bind requirements to slotsharing groups. I
> think the current approach is also flexible enough that a user could simply
> use ParameterTool or similar to use config values and wire that with their
> slotgroups, such that different requirements can be tested without
> recompilation. So I don't see an immediate need to provide a generic
> solution for yaml configuration for now.
>
> Looking at the programmatic interface though, I think we could improve by
> quite a bit and I haven't seen these alternatives being considered in the
> FLIP:
> 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
> properties. Instead of using group names, the user could directly configure
> such an object.
>
> SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
> could also be omitted and auto-generated
> ssg1.setCPUCores(4);
> ...
> DataStream> grades =
> GradeSource
> .getSource(env, rate)
>
> .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> .slotSharingGroup(ssg1);
> DataStream> salaries =
> SalarySource.getSource(env, rate)
>
> .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> .slotSharingGroup(ssg2);
>
> // run the actual window join program with the same slot sharing
> group as grades
> DataStream> joinedStream =
> runWindowJoin(grades, salaries,
> windowSize).slotSharingGroup(ssg1);
>
> Note that we could make it backward compatible by changing the proposed
> StreamExecutionEnvironment#setSlotSharingGroupResource to
> StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
> then use the string name for further reference.
>
> 2) I'm also not sure on the StreamExecutionEnvironment#
> setSlotSharingGroupResources. What's the benefit of the Map version over
> having the simple setter? Even if the user has a map
> slotSharingGroupResources, he could simply do
> slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
>
> 3) Is defining the ExternalResource part of this FLIP? I don't see a
> Public* class yet. I'd be also fine to cut the scope of this FLIP and
> remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
>
> 4) We should probably use a builder pattern around
> ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
> we need to fully specify that in the FLIP but it would be good to at least
> say how they should be created by the user.
>
>
>
> On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:
>
> > @Yang
> > In short, the external resources will participate in resource
> > deduction and be logically ensured, but requesting an external
> > resource must still be done through config options with the current
> > default resource allocation strategy.
> > In FLIP-56, we abstract the logic of resource allocation to the
> > `ResourceAllocationStrategy`. Currently, with its default
> > implementation, ResourceManager would still allocate TMs with the same
> > resource spec and the external resources of it are configured through
> > the config option as well. So, in your case, you need to define the
> > "external-resources" and "external-resources.disk.amount". Then, all
> > the disk requirements defined in the SSG will be logically ensured, as
> > there is no slot level isolation. If the disk space of a task manager
> > cannot fulfill the disk requirement, RM will allocate a new one.
> > In the future, we'd like to introduce a `ResourceAllocationStrategy`
> > which allocates heterogeneous TMs according to the 

[jira] [Created] (FLINK-22936) Support column comment in Schema and ResolvedSchema

2021-06-08 Thread Jark Wu (Jira)
Jark Wu created FLINK-22936:
---

 Summary: Support column comment in Schema and ResolvedSchema
 Key: FLINK-22936
 URL: https://issues.apache.org/jira/browse/FLINK-22936
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Jark Wu
 Fix For: 1.14.0


In order to support column comment in catalog (FLINK-18958), we should first 
support column comment in Schema and ResolvedSchema. 

The API is up to discuss. Currently, we already have 10 methods for adding a 
column in {{Schema}}. If we want to support column comment for each kind of 
column, the number of column methods may double. It's not easy to maintain in 
the long term, and make the API complex. 

Another alternative is adding a new method {{comment(String)}} which will apply 
comment to the previous column. This is not a good builder style, but can make 
the building concise. 
For example, 

{code}
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3)).comment("log 
timestamp")
.columnByExpression("proctime", 
"PROCTIME()").comment("processing time")
.watermark("ts", "ts - INTERVAL '5' SECOND")
.build()
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Tianxin Zhao
Thanks Till, Guowei and Arvid for the insightful discussion!

   1.

   Regarding size and scan performance

   We are in the POC stage and not hitting OOM issue yet, the issue is
   discovered by reading through FileSource implementation. Our order of
   magnitude is each path 200B and ~8000 files per day, this set would reach
   1GB in 1.5+ years. The file path has a S3 TTL on it so scanning could be
   trivial but in current Flink FileSource implementation the Set of
   pathsAlreadyProcessed continuously grows regardless of files removed
   from the path.
   2.

   Regarding data structure for file deduplication

   I like Guowei's idea of maintaining one max timestamp and a list of
   files corresponding to the max timestamp only. However Arvid's point on
   pruning and the example "Consider a file X with modification time M that is
   only visible in the eventual consistent filesystem at time M2" is a good
   example that relying only on timestamp is not enough. Ignoring files whose
   modification time before the max timestamp could end up in ignoring files
   in edge cases.

   We still want to decide whether a file has been processed or not by
   checking against the set of all recently processed files to be accurate. To
   avoid the set growing infinitely, we prune it at every checkpoint, only
   removing those that passed user specified TTL (should be in days/months
   granularity). Only files that the user decides to not process based on its
   modification time older than (watermark - TTL) and doesn't need to rely on
   this set to dedup would be removed from the set. Thus a Map of  makes sense to me, but would like to see your feedback.
   3.

   Regarding of where the deduplication logic lies

   In general, it might actually best to move the decision completely into user
   code land. Maybe we should change FileEnumerator#enumerateSplits to not
   return duplicates and allow it to be stateful.

   This makes a lot of sense to me. The dedup logic seems to be related to
   FileEnumerator only, current Hive subclass are not making use of
   alreadyProcessedPaths in the Checkpoint. We could probably move the
   dedup map and watermark to file Checkpoint only. Then we have the
   PruningEnumerator be an extension of FileEnumerator.

Thanks!

On Tue, Jun 8, 2021 at 4:57 AM Arvid Heise  wrote:

> Hi Tianxin,
>
> I assigned you the ticket, so you could go ahead and create some POC PR. I
> would like to understand the issue first a bit better and then give some
> things to consider. In general, I see your point that in a potentially
> infinitely running application keeping track of all read entities will
> eventually lead to an OOM.
>
> 1. About which order of magnitude are we talking? Your average path should
> be less than 1kb, so 1 million paths, is 1 GB. I'm assuming you have hit
> that limit already and see memory issues on the job manager?
> 2. At that point, do we need to improve the scanning already? How long does
> that take?
> 3. Doesn't NonSplittingRecursiveEnumerator#enumerateSplits always return a
> full collection of files as well? So isn't the problem appearing there as
> well?
>
> For pruning, I'm not too sure. I fear that we always run into issues with
> filesystems that are not behaving as well as the local filesystem.
> 4. If we want to prune the set as you suggested, how can we be sure that we
> are not ignoring any file that is supposed to be processed?
> Consider a file X with modification time M that is only visible in the
> eventual consistent filesystem at time M2. If we have a checkpoint between
> M and M2, wouldn't we also prune X, even though it has not been read yet?
> 5. With all time-based proposals: what happens if you have any kind of time
> drift?
>
> As an alternative
> 6. Couldn't we move the information fully to a state backend, such that
> when the application grows, we can use the disk as well? However, I also
> saw that we currently do not allow user state in sources at all.
> 7. We could also compress the information (hash?). But that's probably just
> buying us a little time.
>
> In general, it might actually best to move the decision completely into
> user code land. Maybe we should change FileEnumerator#enumerateSplits to
> not return duplicates and allow it to be stateful. Then the user can make a
> conscious decision based on the used filesystem. We could provide pruning
> enumerator implementations and the user just has to pick the appropriate
> one if the default one doesn't work anymore.
>
>
> On Tue, Jun 8, 2021 at 12:54 PM Guowei Ma  wrote:
>
> > It would really simplify a lot if the modification timestamp of each
> newly
> > scanned file is increased.
> >
> >  We only need to record the file list corresponding to the largest
> > timestamp.  Timestamp of each scanned file
> >  1. It is smaller than the maximum timestamp, which means it has been
> > processed;
> >  2. The timestamps are equal, so you need to see if it is in this file
> > list, if it 

[jira] [Created] (FLINK-22935) Can not start standalone cluster

2021-06-08 Thread Lyubing Qiang (Jira)
Lyubing Qiang created FLINK-22935:
-

 Summary: Can not start standalone cluster 
 Key: FLINK-22935
 URL: https://issues.apache.org/jira/browse/FLINK-22935
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.2
 Environment: centos
Reporter: Lyubing Qiang
 Attachments: image-2021-06-09-10-05-21-481.png, 
image-2021-06-09-10-06-08-418.png, image-2021-06-09-10-06-48-825.png

I tried to start the standalone mode (following the instructions to configure) 
with 4 slaves and master. When I run the start-cluster.sh it seemed all the 4 
slaves were opened and running, yet actually there're only 3 taskmanagers and 
all of them are on the master machine. Also the number of task slots are 
deviated from the settings in the flink-conf.yaml.

!image-2021-06-09-10-05-21-481.png!

!image-2021-06-09-10-06-08-418.png!

!image-2021-06-09-10-06-48-825.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Request to open the contributor permission!

2021-06-08 Thread Yangze Guo
Hi,

Welcome to the community!
You don't need a contributor's permission to contribute to Apache
Flink. Simply find a JIRA ticket you'd like to work on and ask a
committer to assign you to the ticket. You can refer to the
contribution guidelines [1].

[1] https://flink.apache.org/contributing/how-to-contribute.html

Best,
Yangze Guo

On Wed, Jun 9, 2021 at 12:53 AM w.gh123  wrote:
>
> Hi,
>
>
> I want to contribute to Apache Flink. Would you please give me the 
> contributor permission? My JIRA ID is hapihu


[jira] [Created] (FLINK-22934) Add instructions for using the " ' " escape syntax of SQL client

2021-06-08 Thread Roc Marshal (Jira)
Roc Marshal created FLINK-22934:
---

 Summary: Add instructions for using the " ' " escape syntax of SQL 
client
 Key: FLINK-22934
 URL: https://issues.apache.org/jira/browse/FLINK-22934
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Table SQL / Client
Affects Versions: 1.13.1, 1.13.0
Reporter: Roc Marshal






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22933) Upgrade the Flink Fabric8io/kubernetes-client version to >=5.4.0 to be FIPS compliant

2021-06-08 Thread Fuyao Li (Jira)
Fuyao Li created FLINK-22933:


 Summary: Upgrade the Flink Fabric8io/kubernetes-client version to 
>=5.4.0 to be FIPS compliant
 Key: FLINK-22933
 URL: https://issues.apache.org/jira/browse/FLINK-22933
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.13.1, 1.13.0
Reporter: Fuyao Li


The current Fabric8io version in Flink is 4.9.2

See link: 
[https://github.com/apache/flink/blob/master/flink-kubernetes/pom.xml#L35]

This version of Fabric8io library is not FIPS compliant 
([https://www.sdxcentral.com/security/definitions/what-does-mean-fips-compliant/).]

Such function is added in Fabric8io.

[https://github.com/fabric8io/kubernetes-client/pull/2788]
[https://github.com/fabric8io/kubernetes-client/issues/2732]

 

I am trying to write a native kubernetes operator leveraging APIs and 
interfaces provided by Flink source code. For example, ApplicationDeployer.

I am writing my own implementation based on Yang's example code: 
https://github.com/wangyang0918/flink-native-k8s-operator

 

Using version 4.9.2 for my operator will be work perfectly, but it could cause 
FIPS compliant issues.

 

Using version 5.4.0 will run into issues since Fabric8io version 4 and version 
5 API is not that compatible. I saw errors below.
{code:java}
Exception in thread "main" java.lang.AbstractMethodError: Receiver class 
io.fabric8.kubernetes.client.handlers.ServiceHandler does not define or inherit 
an implementation of the resolved method 'abstract java.lang.Object 
create(okhttp3.OkHttpClient, io.fabric8.kubernetes.client.Config, 
java.lang.String, java.lang.Object, boolean)' of interface 
io.fabric8.kubernetes.client.ResourceHandler.Exception in thread "main" 
java.lang.AbstractMethodError: Receiver class 
io.fabric8.kubernetes.client.handlers.ServiceHandler does not define or inherit 
an implementation of the resolved method 'abstract java.lang.Object 
create(okhttp3.OkHttpClient, io.fabric8.kubernetes.client.Config, 
java.lang.String, java.lang.Object, boolean)' of interface 
io.fabric8.kubernetes.client.ResourceHandler. at 
io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.lambda$createOrReplaceItem$0(CreateOrReplaceHelper.java:77)
 at 
io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:56)
 at 
io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem(CreateOrReplaceHelper.java:91)
 at 
io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplaceOrDeleteExisting(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:454)
 at 
io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:297)
 at 
io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:66)
 at 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.createJobManagerComponent(Fabric8FlinkKubeClient.java:113)
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:274)
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:208)
 at 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
 at 
org.apache.flink.kubernetes.operator.controller.FlinkApplicationController.reconcile(FlinkApplicationController.java:207)
 at 
org.apache.flink.kubernetes.operator.controller.FlinkApplicationController.run(FlinkApplicationController.java:172)
 at 
org.apache.flink.kubernetes.operator.KubernetesOperatorEntrypoint.main(KubernetesOperatorEntrypoint.java:74)2021-06-08
 19:53:57,726 WARN  
io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener [] - Exec 
Failure
{code}
Since  Fabric8io has fixed this issue, maybe we can bump the version in Flink 
to 5.4.0 or 5.4.1 that is released recently?

 

This will also bring additional benefits since users will be able to leverage 
new APIs provided in >=5.4.0 version if anyone need to build operators upon 
this.

See changelog: 
[https://github.com/fabric8io/kubernetes-client/blob/master/CHANGELOG.md]

 

cc [~fly_in_gis] [~rmetzger]

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Watermark propagation with Sink API

2021-06-08 Thread Eron Wright
Voting is re-open for FLIP-167 as-is (without idleness support as was the
point of contention).

On Fri, Jun 4, 2021 at 10:45 AM Eron Wright  wrote:

> Little update on this, more good discussion over the last few days, and
> the FLIP will probably be amended to incorporate idleness.   Voting will
> remain open until, let's say, mid-next week.
>
> On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski 
> wrote:
>
>> I would like to ask you to hold on with counting the votes until I get an
>> answer for my one question in the dev mailing list thread (sorry if it was
>> already covered somewhere).
>>
>> Best, Piotrek
>>
>> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
>>
>> > +1 (binding)
>> >
>> > Best,
>> > Jark
>> >
>> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz 
>> > wrote:
>> >
>> > > +1 (binding)
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 03/06/2021 03:50, Zhou, Brian wrote:
>> > > > +1 (non-binding)
>> > > >
>> > > > Thanks Eron, looking forward to seeing this feature soon.
>> > > >
>> > > > Thanks,
>> > > > Brian
>> > > >
>> > > > -Original Message-
>> > > > From: Arvid Heise 
>> > > > Sent: Wednesday, June 2, 2021 15:44
>> > > > To: dev
>> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
>> > > >
>> > > >
>> > > > [EXTERNAL EMAIL]
>> > > >
>> > > > +1 (binding)
>> > > >
>> > > > Thanks Eron for driving this effort; it will open up new exciting
>> use
>> > > cases.
>> > > >
>> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright > > > .invalid>
>> > > > wrote:
>> > > >
>> > > >> After some good discussion about how to enhance the Sink API to
>> > > >> process watermarks, I believe we're ready to proceed with a vote.
>> > > >> Voting will be open until at least Friday, June 4th, 2021.
>> > > >>
>> > > >> Reference:
>> > > >>
>> > > >>
>> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
>> > > >>
>> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
>> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
>> > > >> [cwiki[.]apache[.]org]
>> > > >>
>> > > >> Discussion thread:
>> > > >>
>> > > >>
>> > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
>> > > >>
>> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
>> > > >> ache.org
>> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
>> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
>> > > >>
>> > > >> Implementation Issue:
>> > > >>
>> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
>> > > >>
>> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
>> > > >> N6AJm4h$ [issues[.]apache[.]org]
>> > > >>
>> > > >> Thanks,
>> > > >> Eron Wright
>> > > >> StreamNative
>> > > >>
>> > >
>> > >
>> >
>>
>


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-08 Thread Eron Wright
Thanks, the narrowed FLIP-167 is fine for now.  I'll re-activate the vote
process.  Thanks!

On Tue, Jun 8, 2021 at 3:01 AM Till Rohrmann  wrote:

> Hi everyone,
>
> I do agree that Flink's definition of idleness is not fully thought through
> yet. Consequently, I would feel a bit uneasy to make it part of Flink's API
> right now. Instead, defining the proper semantics first and then exposing
> it sounds like a good approach forward. Hence, +1 for option number 1,
> which will also allow FLIP-167 to make progress.
>
> Concerning subtasks with no partitions assigned, would it make sense to
> terminate these tasks at some point? That way, the stream would be closed
> and there is no need to maintain a stream status. Of course, this also
> requires at some point that Flink can start new sources when new partitions
> appear.
>
> Cheers,
> Till
>
> On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski 
> wrote:
>
> > Hi Eron,
> >
> > The FLIP-167 is narrow, but we recently discovered some problems with
> > current idleness semantics as Arvid explained. We are planning to
> present a
> > new proposal to redefine them. Probably as a part of it, we would need to
> > rename them. Given that, I think it doesn't make sense to expose idleness
> > to the sinks before we rename and define it properly. In other words:
> >
> > > 2. When the sink operator is idled, tell the sink function.
> >
> > We shouldn't expose stream status as a part of public API until it's
> > properly defined.
> >
> > I would propose one of the two things:
> > 1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
> > Exposing idleness could be part of this next/future FLIP that would
> define
> > idleness in the first place.
> > 2. Block FLIP-167, until the idleness is fixed.
> >
> > I would vote for option number 1.
> >
> > Piotrek
> >
> > pon., 7 cze 2021 o 18:08 Eron Wright 
> > napisał(a):
> >
> > > Piotr, David, and Arvid, we've had an expansive discussion but
> ultimately
> > > the proposal is narrow.  It is:
> > > 1. When a watermark arrives at the sink operator, tell the sink
> function.
> > > 2. When the sink operator is idled, tell the sink function.
> > >
> > > With these enhancements, we will significantly improve correctness in
> > > multi-stage flows, and facilitate an exciting project in the Pulsar
> > > community.  Would you please lend your support to FLIP-167 so that we
> can
> > > land this enhancement for 1.14?  My deepest thanks!
> > >
> > > -Eron
> > >
> > >
> > >
> > >
> > > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:
> > >
> > > > Hi Eron,
> > > >
> > > > you either have very specific use cases in mind or have a
> misconception
> > > > about idleness in Flink with the new sources. The basic idea is that
> > you
> > > > have watermark generators only at the sources and the user supplies
> > them.
> > > > As a source author, you have no option to limit that. Here a bit of
> > > > background:
> > > >
> > > > We observed that many users that read from Kafka were confused about
> no
> > > > visible progress in their Flink applications because of some idle
> > > partition
> > > > and we introduced idleness subsequently. Idleness was always
> considered
> > > as
> > > > a means to achieve progress at the risk of losing a bit of
> correctness.
> > > > So especially in the case that you describe with a Pulsar partition
> > that
> > > is
> > > > empty but indefinitely active, the user needs to be able to use
> > idleness
> > > > such that downstream window operators progress.
> > > >
> > > > I hope to have clarified that "I wouldn't recommend using
> > withIdleness()
> > > > with source-based watermarks." would pretty much make the intended
> use
> > > case
> > > > not work anymore.
> > > >
> > > > ---
> > > >
> > > > Nevertheless, from the discussion with you and some offline
> discussion
> > > with
> > > > Piotr and Dawid, we actually found quite a bit of drawbacks from the
> > > > current definition of idleness:
> > > > - We currently only use idleness to exclude respective upstream tasks
> > > from
> > > > participating in watermark generation (as you have eloquently put
> > further
> > > > up in the thread).
> > > > - However, the definition is bound to records. So while a partition
> is
> > > > idle, no records should be produced.
> > > > - That brings us into quite a few edge cases, where operators emit
> > > records,
> > > > while they are actually idling: Think of timers, asyncIO operators,
> > > window
> > > > operators based on timeouts, etc.
> > > > - The solution would be to turn the operator active while emitting
> and
> > > > returning to being idle afterwards (but when?). However, this has
> some
> > > > unintended side-effects depending on when you switch back.
> > > >
> > > > We are currently thinking that we should rephrase the definition to
> > what
> > > > you described:
> > > > - A channel that is active is providing watermarks.
> > > > - An idle channel is not providing any watermarks but can 

[jira] [Created] (FLINK-22932) RocksDBStateBackendWindowITCase fails with savepoint timeout

2021-06-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-22932:
-

 Summary: RocksDBStateBackendWindowITCase fails with savepoint 
timeout
 Key: FLINK-22932
 URL: https://issues.apache.org/jira/browse/FLINK-22932
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.1
Reporter: Roman Khachatryan
 Fix For: 1.13.2


Initially 
[reported|https://issues.apache.org/jira/browse/FLINK-22067?focusedCommentId=17358306=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17358306]
 in FLINK-22067

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18709=logs=a8bc9173-2af6-5ba8-775c-12063b4f1d54=46a16c18-c679-5905-432b-9be5d8e27bc6=10183

Savepoint is triggered but is not completed in time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22931) Migrate to flink-shaded-force-shading

2021-06-08 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-22931:


 Summary: Migrate to flink-shaded-force-shading
 Key: FLINK-22931
 URL: https://issues.apache.org/jira/browse/FLINK-22931
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.14.0


Migrate to flink-shaded-force-shading, allowing us to drop force-shading 
removing an annoying bit of our build setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Request to open the contributor permission!

2021-06-08 Thread w.gh123
Hi,


I want to contribute to Apache Flink. Would you please give me the 
contributor permission? My JIRA ID is hapihu

[jira] [Created] (FLINK-22930) [flink-python]: Resources should be closed

2021-06-08 Thread wuguihu (Jira)
wuguihu created FLINK-22930:
---

 Summary: [flink-python]: Resources should be closed
 Key: FLINK-22930
 URL: https://issues.apache.org/jira/browse/FLINK-22930
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.1
Reporter: wuguihu



Files that implement the Closeable interface or its super-interface, 
AutoCloseable, needs to be closed after use.


In flink-python module, the class "org.apache.flink.python.util.ZipUtils" 's  
method extractZipFileWithPermissions(),  the OutputStream should be closed.

The details are shown below:


{code:java}
// class: org.apache.flink.python.util.ZipUtils
// method: extractZipFileWithPermissions
// line71:Use try-with-resources

public static void extractZipFileWithPermissions(String zipFilePath, String 
targetPath)
throws IOException {
try (ZipFile zipFile = new ZipFile(zipFilePath)) {
Enumeration entries = zipFile.getEntries();
boolean isUnix = isUnix();

while (entries.hasMoreElements()) {
ZipArchiveEntry entry = entries.nextElement();
File file;
if (entry.isDirectory()) {
……
} else {
……
if (file.createNewFile()) {
//line 71
OutputStream output = new FileOutputStream(file);
IOUtils.copyBytes(zipFile.getInputStream(entry), 
output);
} else {
throw new IOException(
"Create file: " + file.getAbsolutePath() + " 
failed!");
}
}
if (isUnix) {
……
}
}
}
}
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-08 Thread Piotr Nowojski
Hi,

Thanks for resuming this discussion. I think +1 for the proposal of
dropping (deprecating) `dispose()`, and adding `flush()` to the
`StreamOperator`/udfs. Semantically it would be more like new `close()` is
an equivalent of old `dispose()`. Old `close()` is an equivalent of new
`flush() + close()`. I think it provides a relatively painless migration
path (could we write down this migration?).

However I have some doubts about the Flushable interface. First of all,
it wouldn't work for sinks - sinks have no output. Secondly, I don't like
that it opens a possibility for problems like this (incompatible output
types):
```
public class MyMap implements MapFunction, Flushable
{ ...}
```

Also after a quick offline discussion with Dawid, I'm not sure anymore to
which UDFs it actually makes sense to add `flush`, as most of them
shouldn't buffer any data. Apart from Sinks, it's usually an operator that
is buffering the data (that holds true for AsyncFunction, ReduceFunction,
AggregateFunction, MapFunction, FilterFunction, ...). For those functions
it's difficult to buffer any data, as they have no means to control when to
emit this data. One notable exception might be (Co)ProcessFunction, as it
can register timers on it's own. In that case I would propose to do the
following thing:
1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface already
has flush capabilities)
2. Maybe add `flush(Collector)` to `(Co)ProcessFunction`, but maybe we
can postpone it
3. Leave other functions alone.

After all, we could add `flush()` to other functions in the future if we
really find a good motivating example to do so.

About 2. Dawid is pitching an idea to convert `ProcessFunction` into a
proper `Public` API that would replace StreamOperator. We could change
`StreamOperator` to be purely `@Internal` class/interface, and add the
missing functionality to the `ProcessFunction` (InputSelectable,
BoundedInput, MailboxExecutor). With this, adding `flush()` to
`ProcessFunction` would make a lot of sense. But maybe that should be a
story for another day?

Best,
Piotrek

pt., 4 cze 2021 o 10:36 Yun Gao  napisał(a):

> Hi all,
>
> Very thanks @Dawid for resuming the discussion and very thanks @Till for
> the summary ! (and very sorry for I missed the mail and do not response
> in time...)
>
> I also agree with that we could consider the global commits latter
> separately after we have addressed the final checkpoints, and also other
> points as Till summarized.
> Currently the only case that have used the cascade commit is the Table
> FileSystem and Hive connectors. I checked the code and found currently they
> will commit the
> last piece of data directly  in endOfInput(). Although this might emit
> repeat records if there are failover during job finishing, it avoids
> emitting the records in the
> notifyCheckpointComplete() after endOfInput(), thus the modification to
> the operator lifecycle in final checkpoints would cause compatibility
> problem for these connectors,
> thus we do not need to modify them at the first place.
>
> 2. Regarding the operator lifecycle, I also agree with the proposed
> changes. To sum up, I think the operator lifecycle would become
>
> endOfInput(1)
> ...
> endOfInput(n)
> flush() --> call UDF's flush method
> if some operator requires final checkpoints
> snapshotState()
> notifyCheckpointComplete()
> end if
> close() --> call UDF's close method
>
> Since currently the close() is only called in normal finish and dispose()
> will be called in both failover and normal case, for compatibility, I think
> we may
> have to postpone the change to a single close() method to version 2.0 ?
>
> 3. Regarding the name and position of flush() method, I also agree with
> that we will need a separate method to mark the termination of the whole
> stream for
> multiple-input streams. Would it be also ok if we have some modification
> to the current BoundedXXInput interfaces to
>
> interface BoundedInput {
> void endInput() // marks the end of the whole streams, as flush()
> does.
> }
>
> @deprecated // In the future we could remove this interface
> interface BoundedOneInput extends BoundedInput {}
>
> interface BoundedMultiInput extends BoundedInput {
>   void endInput(int i);
>
>   default void endInput() {} // For compatibility
> }
>
> If operator/UDF does not care about the end of a single input, then it
> could directly implement the BoundedInput interface. The possible
> benefit to me is that we might be able to keep only one concept for
> marking the end of stream, especially for the operators with only
> one input.
>
> Very thanks for all the deep insights and discussions!
>
> Best,
> Yun
>
> --
> From:Dawid Wysakowicz 
> Send Time:2021 Jun. 3 (Thu.) 21:21
> To:dev ; Till Rohrmann ; Yun
> Gao 
> Cc:Piotr Nowojski ; Guowei Ma ;
> Stephan Ewen 
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> 

[jira] [Created] (FLINK-22929) Change the default failover strategy to FixDelayRestartStrategy

2021-06-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-22929:
---

 Summary: Change the default failover strategy to 
FixDelayRestartStrategy
 Key: FLINK-22929
 URL: https://issues.apache.org/jira/browse/FLINK-22929
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.1, 1.13.0
Reporter: Yun Gao


Currently for the default failover strategy:
 # Stream Job without checkpoint: NoRestartStrategy
 # Stream Job with checkpoint:  FixDelayRestartStrategy as configured  [in this 
method|https://github.com/apache/flink/blob/ed6b33d487bccd9fd96607a3fe681ead1912d365/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java#L160]
 # Batch Job: NoRestartStrategy

 

The default failover strategy is reasonable for the stream jobs since without 
checkpoint, the stream job could not restart without paying high costs. 
However, for batch jobs, the failover is handled via persisted intermediate 
result partitions, and users usually expect the batch job could finish normally 
by default (similar to other batch processing system). Thus it seems to be more 
reasonable to make the default failover strategy for the batch jobs to be the 
same the stream job with checkpoint enabled (namely FixDelayRestartStrategy).

 

Some users are also [report the related 
issues.|https://lists.apache.org/thread.html/rc4135e4ab41768f5fc3d4405b980872a6e39d2c0f5c92a744c623732%40%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Definition of idle partitions

2021-06-08 Thread Piotr Nowojski
Hi Arvid,

Thanks for writing down this summary and proposal. I think this was the
foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
that idleness is intermittent, strictly a task local concept and as such
shouldn't be exposed in for example sinks. While me and Eron thought that
it's a concept strictly connected to watermarks.

1. I'm big +1 for changing the StreamStatus definition to stream "providing
watermark" and "not providing watermark". With respect to that I agree with
Dawid that record bound idleness *(if we would ever need to define/expose
it)* should be an intermittent concept, like for example the existing in
the Task/runtime input availability (StreamTaskInput#isAvailable).
3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
I also don't have any good ideas.
`WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?

Best,
Piotrek

wt., 8 cze 2021 o 16:35 Arvid Heise  napisał(a):

> Hi devs,
>
> While discussing "Watermark propagation with Sink API" and during
> "[FLINK-18934] Idle stream does not advance watermark in connected stream",
> we noticed some drawbacks on how Flink defines idle partitions currently.
>
> To recap, idleness was always considered as a means to achieve progress in
> window operators with idle partition in the source at the risk of losing a
> bit of correctness. In particular, records could be considered late, simply
> because of that idleness timeout and not because they arrived out of order.
> A potential reprocessing would not be causing these records to be
> considered late and we may end up with a different (correct) result.
>
> The drawbacks that we discovered are as follows:
> - We currently only use idleness to exclude respective upstream tasks from
> participating in watermark generation.
> - However, the definition is bound to records. [1] In particular, while a
> partition is idle, no records should be produced.
> - That brings us into quite a few edge cases, where operators emit records,
> while they are actually idling: Think of timers, asyncIO operators, window
> operators based on timeouts, etc. that trigger on an operator ingesting an
> idle partition.
> - The proper solution would be to turn the operator active while emitting
> and to return to being idle afterwards (but when?). However, this has some
> unintended side-effects depending on when you switch back:
>   - If you toggle stream status for each record, you get a huge overhead on
> stream status records and quite a bit of processing in downstream operators
> (that code path is not much optimized since switching is considered a rare
> thing).
>   - If you toggle after a certain time, you may get delays>idleness in the
> downstream window operators.
>   - You could turn back when you processed all pending mails, but if you
> have a self-replicating mail that would be never. Self-enqueueing, low
> timer would also produce a flood similar to the first case.
>
> All in all, the situation is quite unsatisfying because idleness implies no
> records. However, currently there is no need to have that implication:
> since we only use it for watermarks, we can easily allow records to be
> emitted (in fact that was the old behavior before FLINK-18934 in many
> cases) and still get the intended behavior in respect to watermarks:
> - A channel that is active is providing watermarks.
> - An idle channel is not providing any watermarks but can deliver records.
>
> Ultimately, that would mean that we are actually not talking idle/active
> partitions anymore. We are talking more about whether a particular subtask
> should influence downstream watermark calculation or not. Leading to the
> following questions:
> 1. Do we want to change the definition as outlined?
> 2. Do you see any problem with emitting records on subtask without explicit
> watermarks?
> 3. If we want to go this way, we may need to refine the names/definitions.
> Any ideas?
>
> I think idle partition should translate into something like
> automatic/implicit/passive watermarks; active partition into
> explicit/active watermarks. Then StreamStatus is more about WatermarkMode
> (not really happy with this one).
>
> Best,
>
> Arvid
>
> [1]
>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>


Re: Flink 1.14. Bi-weekly 2021-06-08

2021-06-08 Thread Till Rohrmann
Thanks for this update Joe, Dawid and Xintong. This is super helpful!

Cheers,
Till

On Tue, Jun 8, 2021 at 4:18 PM Johannes Moser  wrote:

> Hi,
>
> Today we had our first bi-weekly.
> Here’s a short summary of what has been discussed.
>
> Please watch the 1.14. Release page [1] to stay up to date.
>
> * Feature freeze date *
> As response to our last email the question was risen to push the feature
> freeze date back by a month, which would mean early September. Vanilla
> Flink Users who might be in favour of keeping it like it is, didn’t seem to
> participate in the discussion so the decision might be biased. We agreed to
> collect further feedback till the next bi-weekly and decide then.
>
> * Build stability *
> We started the release cycle with around 200 test stability issues [2].
> For 1.13. at this stage it was half of this.
> We agreed to do a push within the next two weeks to reduce the amount of
> issues and generally carefully avoid introducing new instabilities.
> We kindly ask every contributor and team working on some Flink components
> to go through them and try to close/get rid of as many as possible.
>
> * Documentation & Release advertisement *
> Let’s aim for making documentation and release advertisement through
> blogposts, meet-ups and presentations a part of the development process. So
> while working on a feature every contributor should already include the
> documentation and plan the advertisement of it.
>
> * Feature list freeze *
> We’d suggest to freeze the list of features that will go into 1.14. in the
> next bi-weekly, so please make sure your item shows up there.
> The release managers will also structure the list a bit more in the next
> couple of days, when the first wave of edits is gone.
>
> So what can you do to make 1.14. a successful release:
> * Provide feedback on the feature freeze date.
> * Get rid of instabilities and don’t introduce new ones.
> * Document and advertise the stuff you do.
> * Add your planned features to the feature list.
>
> Thanks for all your effort,
>
> Dawid, Xintong & Joe
>
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release <
> https://cwiki.apache.org/confluence/display/FLINK/1.14+Release>
> [2] https://issues.apache.org/jira/issues/?filter=12343317 <
> https://issues.apache.org/jira/issues/?filter=12343317>


[jira] [Created] (FLINK-22928) Unexpected exception happens in RecordWriter when stopping-with-savepoint

2021-06-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-22928:
---

 Summary: Unexpected exception happens in RecordWriter when 
stopping-with-savepoint
 Key: FLINK-22928
 URL: https://issues.apache.org/jira/browse/FLINK-22928
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.11.2
Reporter: Yun Gao


{code:java}
2021-06-05 10:02:51
java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
1928)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(
StreamOperatorWrapper.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(
StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(
StreamOperatorWrapper.java:80)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.closeOperators(OperatorChain.java:302)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
StreamTask.java:576)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:544)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput
.emitWatermark(OperatorChain.java:642)
at org.apache.flink.streaming.api.operators.CountingOutput
.emitWatermark(CountingOutput.java:41)
at org.apache.flink.streaming.runtime.operators.
TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
TimestampsAndWatermarksOperator.java:165)
at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks
.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
at org.apache.flink.streaming.runtime.operators.
TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:
125)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.closeOperator(StreamOperatorWrapper.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:
78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl
.tryYield(MailboxExecutorImpl.java:90)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
... 9 more
Caused by: java.lang.RuntimeException
at org.apache.flink.streaming.runtime.io.RecordWriterOutput
.emitWatermark(RecordWriterOutput.java:123)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:
762)
at org.apache.flink.streaming.api.operators.CountingOutput
.emitWatermark(CountingOutput.java:41)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.processWatermark(AbstractStreamOperator.java:570)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput
.emitWatermark(OperatorChain.java:638)
... 21 more
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179
)
at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(
BufferBuilder.java:83)
at org.apache.flink.runtime.io.network.api.serialization.
SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:
90)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter
.copyFromSerializerToTargetChannel(RecordWriter.java:136)
at org.apache.flink.runtime.io.network.api.writer.
ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:
80)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput
.emitWatermark(RecordWriterOutput.java:121)
... 25 more
{code}
The issue seems to happen when stoping a job with stop-with-savepoint. It is 

[DISCUSS] Definition of idle partitions

2021-06-08 Thread Arvid Heise
Hi devs,

While discussing "Watermark propagation with Sink API" and during
"[FLINK-18934] Idle stream does not advance watermark in connected stream",
we noticed some drawbacks on how Flink defines idle partitions currently.

To recap, idleness was always considered as a means to achieve progress in
window operators with idle partition in the source at the risk of losing a
bit of correctness. In particular, records could be considered late, simply
because of that idleness timeout and not because they arrived out of order.
A potential reprocessing would not be causing these records to be
considered late and we may end up with a different (correct) result.

The drawbacks that we discovered are as follows:
- We currently only use idleness to exclude respective upstream tasks from
participating in watermark generation.
- However, the definition is bound to records. [1] In particular, while a
partition is idle, no records should be produced.
- That brings us into quite a few edge cases, where operators emit records,
while they are actually idling: Think of timers, asyncIO operators, window
operators based on timeouts, etc. that trigger on an operator ingesting an
idle partition.
- The proper solution would be to turn the operator active while emitting
and to return to being idle afterwards (but when?). However, this has some
unintended side-effects depending on when you switch back:
  - If you toggle stream status for each record, you get a huge overhead on
stream status records and quite a bit of processing in downstream operators
(that code path is not much optimized since switching is considered a rare
thing).
  - If you toggle after a certain time, you may get delays>idleness in the
downstream window operators.
  - You could turn back when you processed all pending mails, but if you
have a self-replicating mail that would be never. Self-enqueueing, low
timer would also produce a flood similar to the first case.

All in all, the situation is quite unsatisfying because idleness implies no
records. However, currently there is no need to have that implication:
since we only use it for watermarks, we can easily allow records to be
emitted (in fact that was the old behavior before FLINK-18934 in many
cases) and still get the intended behavior in respect to watermarks:
- A channel that is active is providing watermarks.
- An idle channel is not providing any watermarks but can deliver records.

Ultimately, that would mean that we are actually not talking idle/active
partitions anymore. We are talking more about whether a particular subtask
should influence downstream watermark calculation or not. Leading to the
following questions:
1. Do we want to change the definition as outlined?
2. Do you see any problem with emitting records on subtask without explicit
watermarks?
3. If we want to go this way, we may need to refine the names/definitions.
Any ideas?

I think idle partition should translate into something like
automatic/implicit/passive watermarks; active partition into
explicit/active watermarks. Then StreamStatus is more about WatermarkMode
(not really happy with this one).

Best,

Arvid

[1]
https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86


Flink 1.14. Bi-weekly 2021-06-08

2021-06-08 Thread Johannes Moser
Hi,

Today we had our first bi-weekly.
Here’s a short summary of what has been discussed.

Please watch the 1.14. Release page [1] to stay up to date.

* Feature freeze date *
As response to our last email the question was risen to push the feature freeze 
date back by a month, which would mean early September. Vanilla Flink Users who 
might be in favour of keeping it like it is, didn’t seem to participate in the 
discussion so the decision might be biased. We agreed to collect further 
feedback till the next bi-weekly and decide then.

* Build stability *
We started the release cycle with around 200 test stability issues [2]. For 
1.13. at this stage it was half of this.
We agreed to do a push within the next two weeks to reduce the amount of issues 
and generally carefully avoid introducing new instabilities.
We kindly ask every contributor and team working on some Flink components to go 
through them and try to close/get rid of as many as possible.

* Documentation & Release advertisement *
Let’s aim for making documentation and release advertisement through blogposts, 
meet-ups and presentations a part of the development process. So while working 
on a feature every contributor should already include the documentation and 
plan the advertisement of it.

* Feature list freeze *
We’d suggest to freeze the list of features that will go into 1.14. in the next 
bi-weekly, so please make sure your item shows up there.
The release managers will also structure the list a bit more in the next couple 
of days, when the first wave of edits is gone.

So what can you do to make 1.14. a successful release:
* Provide feedback on the feature freeze date.
* Get rid of instabilities and don’t introduce new ones.
* Document and advertise the stuff you do.
* Add your planned features to the feature list.

Thanks for all your effort,

Dawid, Xintong & Joe


[1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release 

[2] https://issues.apache.org/jira/issues/?filter=12343317 


[jira] [Created] (FLINK-22927) Exception on JobClient.get_job_status().result()

2021-06-08 Thread Jira
Maciej Bryński created FLINK-22927:
--

 Summary: Exception on JobClient.get_job_status().result()
 Key: FLINK-22927
 URL: https://issues.apache.org/jira/browse/FLINK-22927
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Maciej Bryński


Following code finish with exception
{code:java}
result = table_env.execute_sql("INSERT INTO print SELECT * FROM datagen")
result.get_job_client().get_job_status().result()

---
ValueErrorTraceback (most recent call last)
ValueError: JavaObject id=o125 is not a valid JobStatus

During handling of the above exception, another exception occurred:

ValueErrorTraceback (most recent call last)
 in 
> 1 result.get_job_client().get_job_status().result()

/usr/local/lib/python3.8/dist-packages/pyflink/common/completable_future.py in 
result(self)
 76 return self._j_completable_future.get()
 77 else:
---> 78 return self._py_class(self._j_completable_future.get())
 79 
 80 def exception(self):

/usr/lib/python3.8/enum.py in __call__(cls, value, names, module, qualname, 
type, start)
307 """
308 if names is None:  # simple value lookup
--> 309 return cls.__new__(cls, value)
310 # otherwise, functional API: we're creating a new Enum type
311 return cls._create_(value, names, module=module, 
qualname=qualname, type=type, start=start)

/usr/lib/python3.8/enum.py in __new__(cls, value)
598 )
599 exc.__context__ = ve_exc
--> 600 raise exc
601 
602 def _generate_next_value_(name, start, count, last_values):

/usr/lib/python3.8/enum.py in __new__(cls, value)
582 try:
583 exc = None
--> 584 result = cls._missing_(value)
585 except Exception as e:
586 exc = e

/usr/lib/python3.8/enum.py in _missing_(cls, value)
611 @classmethod
612 def _missing_(cls, value):
--> 613 raise ValueError("%r is not a valid %s" % (value, cls.__name__))
614 
615 def __repr__(self):

ValueError: JavaObject id=o125 is not a valid JobStatus
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22926) IDLE source should go ACTIVE when registering a new split

2021-06-08 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22926:


 Summary: IDLE source should go ACTIVE when registering a new split
 Key: FLINK-22926
 URL: https://issues.apache.org/jira/browse/FLINK-22926
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.14.0


When a FLIP-27 source is IDLE and registers a new split it does not go 
immediately ACTIVE. We should consider watermarks from a newly registered split 
immediately after registration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22925) "FieldDescriptor does not match message type" ERROR when use protobuf-router

2021-06-08 Thread Bill lee (Jira)
Bill lee created FLINK-22925:


 Summary: "FieldDescriptor does not match message type" ERROR  when 
use protobuf-router
 Key: FLINK-22925
 URL: https://issues.apache.org/jira/browse/FLINK-22925
 Project: Flink
  Issue Type: Bug
  Components: Stateful Functions
Affects Versions: statefun-3.0.0
 Environment: Flink 1.12.2

stateful function 3.0.0
Reporter: Bill lee
 Attachments: image-2021-06-08-21-22-01-748.png

When use protobuf-router in I/O module as follow: 
{code:java}
version: "3.0"
module:
  meta:
type: remote
  spec:
endpoints:
  - endpoint:
  meta:
kind: http
  spec:
functions: dga/*
urlPathTemplate: http://cic02:/statefun
timeouts:
  call: 2min
ingresses:
  - ingress:
  meta:
type: statefun.kafka.io/protobuf-ingress
id: dga/names
  spec:
address: cic02:9092
consumerGroupId: my-group-id
topics:
  - index_events
messageType: com.my.protobuf.XxMessage
descriptorSet: classpath:stream.desc
egresses:
  - egress:
  meta:
type: io.statefun.kafka/egress
id: dga/greets
  spec:
address: cic02:9092
deliverySemantic:
  type: exactly-once
  transactionTimeoutMillis: 10
routers:
  - router:
  meta:
type: org.apache.flink.statefun.sdk/protobuf-router
  spec:
ingress: dga/names
target: "dga/person/{{$.src_ip}}"
messageType: com.my.protobuf.XxMessage
descriptorSet: classpath:stream.desc
{code}
I got  protobuf error : "FieldDescriptor does not match message type"

!image-2021-06-08-21-22-01-748.png!

And then I make a test like this:
{code:java}
  @Test
  public void exampleUsage01() throws IOException {
Message originalMessage = SimpleMessage.newBuilder().setName("bob").build();
ProtobufDescriptorMap descriptorPath01 = 
ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
Optional maybeDescriptor01 =

descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage");
Descriptors.Descriptor descriptor = (Descriptors.Descriptor) 
maybeDescriptor01.get();
DynamicMessage dynamicMessage = 
DynamicMessage.getDefaultInstance(descriptor);
Parser parser = dynamicMessage.getParserForType();
Message message = parser.parseFrom(originalMessage.toByteArray());
ProtobufDescriptorMap descriptorPath = 
ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
Optional maybeDescriptor =

descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage");
AddressResolver evaluator = 
AddressResolver.fromAddressTemplate((Descriptors.Descriptor) 
maybeDescriptor.get(), "dga/person/{{$.name}}");
Address targetAddress = evaluator.evaluate(message);
System.out.println(targetAddress);
  }
{code}
also got the same error.

I think the cause is that , descriptorSet is defied in both ingress and router, 
and generated two different Descriptors for the message.

Please correct me if I am wrong.  

And any advise for this problem?  Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22924) Expose create_local_environment in PyFlink

2021-06-08 Thread Jira
Maciej Bryński created FLINK-22924:
--

 Summary: Expose create_local_environment in PyFlink
 Key: FLINK-22924
 URL: https://issues.apache.org/jira/browse/FLINK-22924
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Maciej Bryński


StreamExecutionEnvironment has some methods for creating local env with 
configuration.

We should expose them in Python API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li 
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details.  FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >>current hash-based blocking shuffle implementation writes too many
>>> files
>>> >>concurrently which gives high pressure to the file system, for
>>> example,
>>> >>maintenance of too many file metas, exhaustion of inodes or file
>>> >>descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >>based blocking shuffle don’t have the problem because for one
>>> result
>>> >>partition, only one file is written at the same time.
>>> >>2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >>can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >>sequential read is also important because of read ahead and
>>> cache). For
>>> >>batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >>is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >>of small subpartition files. By merging data of all subpartitions
>>> together
>>> >>in one file, more sequential read can be achieved.
>>> >>3. *Resource*: For current hash-based implementation, each
>>> >>subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >>memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >>memory per result partition if parallelism is set to 1 and
>>> because of
>>> >>the huge network consumption, it is hard to config the network
>>> memory for
>>> >>large scale batch job and  sometimes parallelism can not be
>>> increased just
>>> >>because of insufficient network memory  which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>


Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Arvid Heise
Hi Tianxin,

I assigned you the ticket, so you could go ahead and create some POC PR. I
would like to understand the issue first a bit better and then give some
things to consider. In general, I see your point that in a potentially
infinitely running application keeping track of all read entities will
eventually lead to an OOM.

1. About which order of magnitude are we talking? Your average path should
be less than 1kb, so 1 million paths, is 1 GB. I'm assuming you have hit
that limit already and see memory issues on the job manager?
2. At that point, do we need to improve the scanning already? How long does
that take?
3. Doesn't NonSplittingRecursiveEnumerator#enumerateSplits always return a
full collection of files as well? So isn't the problem appearing there as
well?

For pruning, I'm not too sure. I fear that we always run into issues with
filesystems that are not behaving as well as the local filesystem.
4. If we want to prune the set as you suggested, how can we be sure that we
are not ignoring any file that is supposed to be processed?
Consider a file X with modification time M that is only visible in the
eventual consistent filesystem at time M2. If we have a checkpoint between
M and M2, wouldn't we also prune X, even though it has not been read yet?
5. With all time-based proposals: what happens if you have any kind of time
drift?

As an alternative
6. Couldn't we move the information fully to a state backend, such that
when the application grows, we can use the disk as well? However, I also
saw that we currently do not allow user state in sources at all.
7. We could also compress the information (hash?). But that's probably just
buying us a little time.

In general, it might actually best to move the decision completely into
user code land. Maybe we should change FileEnumerator#enumerateSplits to
not return duplicates and allow it to be stateful. Then the user can make a
conscious decision based on the used filesystem. We could provide pruning
enumerator implementations and the user just has to pick the appropriate
one if the default one doesn't work anymore.


On Tue, Jun 8, 2021 at 12:54 PM Guowei Ma  wrote:

> It would really simplify a lot if the modification timestamp of each newly
> scanned file is increased.
>
>  We only need to record the file list corresponding to the largest
> timestamp.  Timestamp of each scanned file
>  1. It is smaller than the maximum timestamp, which means it has been
> processed;
>  2. The timestamps are equal, so you need to see if it is in this file
> list, if it is, you don't need to process it, if it is not, you need to
> process it;
>  3. It is larger than the maximum timestamp and has not been processed
>
>  If the maximum timestamp is dynamically restored from the file list every
> time it is started, the state compatibility issue can be ignored.
>
>
> BTW I haven't done any test, but I am actually a little curious, if a lot
> of files have been processed, is the scan itself already very slow?  I mean
> maybe the bottleneck at the beginning might be scan?
>
>
>
> > 在 2021年6月8日,下午5:41,Till Rohrmann  写道:
> >
> > Hi Tianxin,
> >
> > thanks for starting this discussion. I am pulling in Arvid who works on
> > Flink's connectors.
> >
> > I think the problem you are describing can happen.
> >
> > From what I understand you are proposing to keep track of the watermark
> of
> > processed file input splits and then filter out splits based on their
> > modification timestamps and the watermark. What is the benefit of keeping
> > for every split the modification timestamp in the map? Could it also work
> > if we sort the input splits according to their modification timestamps
> and
> > then remember the last processed split? That way we only remember a
> single
> > value and upon recovery, we only process those splits which have a newer
> > modification timestamp.
> >
> > Cheers,
> > Till
> >
> >> On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao 
> wrote:
> >>
> >> Hi!
> >>
> >> Currently Flink File Source relies on a Set pathsAlreadyProcessed
> in
> >> SplitEnumerator to decide which file has been processed and avoids
> >> reprocessing files if a file is already in this set. However this set
> could
> >> be ever growing and ultimately exceed memory size if there are new files
> >> continuously added to the input path.
> >>
> >> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would
> >> like to be assigned to the ticket.
> >>
> >> Current proposed change as belows, would like to get an agreement on the
> >> approach taken.
> >>
> >>   1.
> >>
> >>   Maintain fileWatermark updated by new files modification time in
> >>   ContinuousFileSplitEnumerator
> >>   2.
> >>
> >>   Change Set pathsAlreadyProcessed to a HashMap
> >>   pathsAlreadyProcessed where the key is same as before which is the
> file
> >>   path of already processed files, and the value is file modification
> >> time,
> >>   expose getModificationTime() method to FileSourceSplit.
> >>
> >>

Re: How to unsubscribe?

2021-06-08 Thread Leonard Xu
Hi, Morgan

Just send an email with any content to user-unsubscr...@flink.apache.org 
 will unsubscribe the mail from Flink 
 user mail list.
And also send an email to with any content to dev-unsubscr...@flink.apache.org 
 will unsubscribe the mail from Flink 
dev mail list.

Please make sure you’ve sent the email to correct address.

Best,
Leonard

[jira] [Created] (FLINK-22923) Queryable state (rocksdb) with TM restart end-to-end test unstable

2021-06-08 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-22923:
--

 Summary: Queryable state (rocksdb) with TM restart end-to-end test 
unstable
 Key: FLINK-22923
 URL: https://issues.apache.org/jira/browse/FLINK-22923
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Queryable State
Affects Versions: 1.14.0
Reporter: Robert Metzger
 Fix For: 1.14.0


https://dev.azure.com/rmetzger/Flink/_build/results?buildId=9119=logs=9401bf33-03c4-5a24-83fe-e51d75db73ef=72901ab2-7cd0-57be-82b1-bca51de20fba

{code}
Jun 04 19:39:12 16/17 completed checkpoints
Jun 04 19:39:14 16/17 completed checkpoints
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Jun 04 19:39:17 after: 20
Jun 04 19:39:17 An error occurred
Jun 04 19:39:17 [FAIL] Test script contains errors.
Jun 04 19:39:17 Checking of logs skipped.
Jun 04 19:39:17 
Jun 04 19:39:17 [FAIL] 'Queryable state (rocksdb) with TM restart end-to-end 
test' failed after 0 minutes and 48 seconds! Test exited with exit code 1
Jun 04 19:39:17 

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22922) Migrate flink website to hugo

2021-06-08 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-22922:


 Summary: Migrate flink website to hugo
 Key: FLINK-22922
 URL: https://issues.apache.org/jira/browse/FLINK-22922
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Chesnay Schepler


Hugo is working like a charm for the Flink documentation. To reduce the number 
of software stacks, and massively reduce friction when building the current 
Flink website, we should migrate the Flink website to hugo as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann  于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>current hash-based blocking shuffle implementation writes too many
>> files
>> >>concurrently which gives high pressure to the file system, for
>> example,
>> >>maintenance of too many file metas, exhaustion of inodes or file
>> >>descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>based blocking shuffle don’t have the problem because for one result
>> >>partition, only one file is written at the same time.
>> >>2. *Performance*: Large amounts of small shuffle files and random IO
>> >>can influence shuffle performance a lot especially for hdd (for ssd,
>> >>sequential read is also important because of read ahead and cache).
>> For
>> >>batch jobs processing massive data, small amount of data per
>> subpartition
>> >>is common because of high parallelism. Besides, data skew is
>> another cause
>> >>of small subpartition files. By merging data of all subpartitions
>> together
>> >>in one file, more sequential read can be achieved.
>> >>3. *Resource*: For current hash-based implementation, each
>> >>subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>memory consumption can be huge. For example, we need at least 320M
>> network
>> >>memory per result partition if parallelism is set to 1 and
>> because of
>> >>the huge network consumption, it is hard to config the network
>> memory for
>> >>large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Guowei Ma
It would really simplify a lot if the modification timestamp of each newly 
scanned file is increased.

 We only need to record the file list corresponding to the largest timestamp.  
Timestamp of each scanned file
 1. It is smaller than the maximum timestamp, which means it has been processed;
 2. The timestamps are equal, so you need to see if it is in this file list, if 
it is, you don't need to process it, if it is not, you need to process it;
 3. It is larger than the maximum timestamp and has not been processed

 If the maximum timestamp is dynamically restored from the file list every time 
it is started, the state compatibility issue can be ignored.


BTW I haven't done any test, but I am actually a little curious, if a lot of 
files have been processed, is the scan itself already very slow?  I mean maybe 
the bottleneck at the beginning might be scan?



> 在 2021年6月8日,下午5:41,Till Rohrmann  写道:
> 
> Hi Tianxin,
> 
> thanks for starting this discussion. I am pulling in Arvid who works on
> Flink's connectors.
> 
> I think the problem you are describing can happen.
> 
> From what I understand you are proposing to keep track of the watermark of
> processed file input splits and then filter out splits based on their
> modification timestamps and the watermark. What is the benefit of keeping
> for every split the modification timestamp in the map? Could it also work
> if we sort the input splits according to their modification timestamps and
> then remember the last processed split? That way we only remember a single
> value and upon recovery, we only process those splits which have a newer
> modification timestamp.
> 
> Cheers,
> Till
> 
>> On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao  wrote:
>> 
>> Hi!
>> 
>> Currently Flink File Source relies on a Set pathsAlreadyProcessed in
>> SplitEnumerator to decide which file has been processed and avoids
>> reprocessing files if a file is already in this set. However this set could
>> be ever growing and ultimately exceed memory size if there are new files
>> continuously added to the input path.
>> 
>> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would
>> like to be assigned to the ticket.
>> 
>> Current proposed change as belows, would like to get an agreement on the
>> approach taken.
>> 
>>   1.
>> 
>>   Maintain fileWatermark updated by new files modification time in
>>   ContinuousFileSplitEnumerator
>>   2.
>> 
>>   Change Set pathsAlreadyProcessed to a HashMap
>>   pathsAlreadyProcessed where the key is same as before which is the file
>>   path of already processed files, and the value is file modification
>> time,
>>   expose getModificationTime() method to FileSourceSplit.
>> 
>> 
>>   1.
>> 
>>   Adding a fileExpireTime user configurable config, any files older
>> than fileWatermark
>>   - fileExpireTime would get ignored.
>>   2.
>> 
>>   When snapshotting splitEnumerator, remove files that are older than
>> fileWatermark
>>   - fileExpireTime from the pathsAlreadyProcessed map.
>>   3.
>> 
>>   Adding alreadyProcessedPaths map and fileWatermark to
>>   PendingSplitsCheckpoint, modify the current
>>   PendingSplitsCheckpointSerializer to add a version2 serializer that
>>   serialize the alreadyProcessedPaths map which included file modification
>>   time.
>>   4.
>> 
>>   Subclass of PendingSplitsCheckpoint like
>>   ContinuousHivePendingSplitsCheckpoint would not be impacted by
>>   initializing an empty alreadyProcessedMap and 0 as initial watermark.
>> 
>> Thanks!
>> 


Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-08 Thread Arvid Heise
Hi Yangze,

I like the general approach to bind requirements to slotsharing groups. I
think the current approach is also flexible enough that a user could simply
use ParameterTool or similar to use config values and wire that with their
slotgroups, such that different requirements can be tested without
recompilation. So I don't see an immediate need to provide a generic
solution for yaml configuration for now.

Looking at the programmatic interface though, I think we could improve by
quite a bit and I haven't seen these alternatives being considered in the
FLIP:
1) Add new class SlotSharingGroup that incorporates all ResourceSpec
properties. Instead of using group names, the user could directly configure
such an object.

SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
could also be omitted and auto-generated
ssg1.setCPUCores(4);
...
DataStream> grades =
GradeSource
.getSource(env, rate)

.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
.slotSharingGroup(ssg1);
DataStream> salaries =
SalarySource.getSource(env, rate)

.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
.slotSharingGroup(ssg2);

// run the actual window join program with the same slot sharing
group as grades
DataStream> joinedStream =
runWindowJoin(grades, salaries,
windowSize).slotSharingGroup(ssg1);

Note that we could make it backward compatible by changing the proposed
StreamExecutionEnvironment#setSlotSharingGroupResource to
StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
then use the string name for further reference.

2) I'm also not sure on the StreamExecutionEnvironment#
setSlotSharingGroupResources. What's the benefit of the Map version over
having the simple setter? Even if the user has a map
slotSharingGroupResources, he could simply do
slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);

3) Is defining the ExternalResource part of this FLIP? I don't see a
Public* class yet. I'd be also fine to cut the scope of this FLIP and
remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.

4) We should probably use a builder pattern around
ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
we need to fully specify that in the FLIP but it would be good to at least
say how they should be created by the user.



On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:

> @Yang
> In short, the external resources will participate in resource
> deduction and be logically ensured, but requesting an external
> resource must still be done through config options with the current
> default resource allocation strategy.
> In FLIP-56, we abstract the logic of resource allocation to the
> `ResourceAllocationStrategy`. Currently, with its default
> implementation, ResourceManager would still allocate TMs with the same
> resource spec and the external resources of it are configured through
> the config option as well. So, in your case, you need to define the
> "external-resources" and "external-resources.disk.amount". Then, all
> the disk requirements defined in the SSG will be logically ensured, as
> there is no slot level isolation. If the disk space of a task manager
> cannot fulfill the disk requirement, RM will allocate a new one.
> In the future, we'd like to introduce a `ResourceAllocationStrategy`
> which allocates heterogeneous TMs according to the requirements. Then,
> user only needs to define the driver of external resources when
> needed.
> Also, regarding the resource isolation, we may provide a fine-grained
> mode in which each slot can only fetch the information of external
> resources it requires in the future. But that is out of the scope of
> this PR.
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 4:20 PM Yang Wang  wrote:
> >
> > Thanks @Yangze for preparing this FLIP.
> >
> > I think this is a good start point for the community users to have a
> taste
> > on the fine-grained
> > resource management, which we all believe it could improve the Flink job
> > stability and
> > cluster utilization.
> >
> > I have a simple question about the extended resources. It is possible to
> > combine extended resources
> > with fine-grained resource management. Except for the GPU, FPGA and other
> > new computing devices,
> > maybe the disk resource is a more general use case. For example,
> different
> > SSG may have various
> > disk requirements based on the state. So we need to allocate enough
> > ephemeral storage resource for every
> > TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> > due to running out of limits.
> >
> >
> > Best,
> > Yang
> >
> >
> > Xintong Song  于2021年6月8日周二 下午1:47写道:
> >
> > > I think being able to specify fine grained resource requirements
> without
> > > having to change the codes and 

Re: [DISCUSS][Statebackend][Runtime] Changelog Statebackend Configuration Proposal

2021-06-08 Thread Yu Li
+1 for option 3.

IMHO persisting (operator's) state data through change log is an
independent mechanism which could co-work with all kinds of local state
stores (heap and rocksdb). This mechanism is similar to the WAL
(write-ahead-log) mechanism in the database system. Although implement-wise
we're using wrapper (decorator) pattern and naming it as
`ChangeLogStateBackend`, it's not really another type of state backend. For
the same reason, ChangeLogStateBackend should be an internal class and not
exposed to the end user. Users only need to know / control whether to
enable change log or not, just like whether to enable WAL in the
traditional database system.

Thanks.

Best Regards,
Yu


On Thu, 3 Jun 2021 at 22:50, Piotr Nowojski  wrote:

> Hi,
>
> I would actually prefer option 6 (or 5/4), for the sake of configuration
> being explicit and self explanatory. But at the same time I don't have very
> hard preferences and from the remaining options, option 3 seems the most
> reasonable.
>
> The question would be, do we want to expose to the users that
> ChangeLogStateBackend is wrapping an inner state backend or not? If not,
> option 3 is the best. If we do, if we want to teach the users and help them
> build the understanding of how things are working underneath, option 5 or 6
> are better.
>
> Best,
> Piotrek
>
> śr., 2 cze 2021 o 04:36 Yun Tang  napisał(a):
>
> > Hi Yuan, thanks for launching this discussion.
> >
> > I prefer option-3 as this is the easiest to understand for users.
> >
> >
> > Best
> > Yun Tang
> > 
> > From: Roman Khachatryan 
> > Sent: Monday, May 31, 2021 16:53
> > To: dev 
> > Subject: Re: [DISCUSS][Statebackend][Runtime] Changelog Statebackend
> > Configuration Proposal
> >
> > Hey Yuan, thanks for the proposal
> >
> > I think Option 3 is the simplest to use and exposes less details than any
> > other.
> > It's also consistent with the current way of configuring state
> > backends, as long as we treat change logging as a common feature
> > applicable to any state backend, like e.g.
> > state.backend.local-recovery.
> >
> > Option 6 seems slightly less preferable as it exposes more details but
> > I think is the most viable alternative.
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, May 31, 2021 at 8:39 AM Yuan Mei  wrote:
> > >
> > > Hey all,
> > >
> > > We would like to start a discussion on how to enable/config Changelog
> > > Statebakcend.
> > >
> > > As part of FLIP-158[1], Changelog state backend wraps on top of
> existing
> > > state backend (HashMapStateBackend, EmbeddedRocksDBStateBackend and may
> > > expect more) and delegates state changes to the underlying state
> > backends.
> > > This thread is to discuss the problem of how Changelog StateBackend
> > should
> > > be enabled and configured.
> > >
> > > Proposed options to enable/config state changelog is listed below:
> > >
> > > Option 1: Enable Changelog Statebackend through a Boolean Flag
> > >
> > > Option 2: Enable Changelog Statebackend through a Boolean Flag + a
> > Special
> > > Case
> > >
> > > Option 3: Enable Changelog Statebackend through a Boolean Flag + W/O
> > > ChangelogStateBackend Exposed
> > >
> > > Option 4: Explicit Nested Configuration + “changelog.inner” prefix for
> > > inner backend
> > >
> > > Option 5: Explicit Nested Configuration + inner state backend
> > configuration
> > > unchanged
> > >
> > > Option 6: Config Changelog and Inner Statebackend All-Together
> > >
> > > Details of each option can be found here:
> > >
> >
> https://docs.google.com/document/d/13AaCf5fczYTDHZ4G1mgYL685FqbnoEhgo0cdwuJlZmw/edit?usp=sharing
> > >
> > > When considering these options, please consider these four dimensions:
> > > 1 Consistency
> > > API/config should follow a consistent model and should not have
> > > contradicted logic beneath
> > > 2 Simplicity
> > > API should be easy to use and not introduce too much burden on users
> > > 3. Explicity
> > > API/config should not contain implicit assumptions and should be
> > intuitive
> > > to users
> > > 4. Extensibility
> > > With foreseen future, whether the current setting can be easily
> extended
> > >
> > > Please let us know what do you think and please keep the discussion in
> > this
> > > mailing thread.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
> > >
> > > Best
> > > Yuan
> >
>


How to unsubscribe?

2021-06-08 Thread Geldenhuys , Morgan Karl
How can I unsubscribe to this mailing lists? The volume of is just getting too 
much at the moment. Following the steps described in the website 
(https://flink.apache.org/community.html) did not appear to do anything.

Sorry for the spam and thanks in advance.


[jira] [Created] (FLINK-22921) SQL Client can't resolve the escape of "'" correctly.

2021-06-08 Thread Roc Marshal (Jira)
Roc Marshal created FLINK-22921:
---

 Summary: SQL Client can't resolve the escape of "'" correctly.
 Key: FLINK-22921
 URL: https://issues.apache.org/jira/browse/FLINK-22921
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Reporter: Roc Marshal
 Attachments: escape1.jpeg, escape2.jpeg, escape3.jpeg, escape4.jpeg, 
escape5.jpeg

Such as follows:

where =...

or

where  like ... !escape1.jpeg!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-08 Thread Till Rohrmann
Hi everyone,

I do agree that Flink's definition of idleness is not fully thought through
yet. Consequently, I would feel a bit uneasy to make it part of Flink's API
right now. Instead, defining the proper semantics first and then exposing
it sounds like a good approach forward. Hence, +1 for option number 1,
which will also allow FLIP-167 to make progress.

Concerning subtasks with no partitions assigned, would it make sense to
terminate these tasks at some point? That way, the stream would be closed
and there is no need to maintain a stream status. Of course, this also
requires at some point that Flink can start new sources when new partitions
appear.

Cheers,
Till

On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski 
wrote:

> Hi Eron,
>
> The FLIP-167 is narrow, but we recently discovered some problems with
> current idleness semantics as Arvid explained. We are planning to present a
> new proposal to redefine them. Probably as a part of it, we would need to
> rename them. Given that, I think it doesn't make sense to expose idleness
> to the sinks before we rename and define it properly. In other words:
>
> > 2. When the sink operator is idled, tell the sink function.
>
> We shouldn't expose stream status as a part of public API until it's
> properly defined.
>
> I would propose one of the two things:
> 1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
> Exposing idleness could be part of this next/future FLIP that would define
> idleness in the first place.
> 2. Block FLIP-167, until the idleness is fixed.
>
> I would vote for option number 1.
>
> Piotrek
>
> pon., 7 cze 2021 o 18:08 Eron Wright 
> napisał(a):
>
> > Piotr, David, and Arvid, we've had an expansive discussion but ultimately
> > the proposal is narrow.  It is:
> > 1. When a watermark arrives at the sink operator, tell the sink function.
> > 2. When the sink operator is idled, tell the sink function.
> >
> > With these enhancements, we will significantly improve correctness in
> > multi-stage flows, and facilitate an exciting project in the Pulsar
> > community.  Would you please lend your support to FLIP-167 so that we can
> > land this enhancement for 1.14?  My deepest thanks!
> >
> > -Eron
> >
> >
> >
> >
> > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:
> >
> > > Hi Eron,
> > >
> > > you either have very specific use cases in mind or have a misconception
> > > about idleness in Flink with the new sources. The basic idea is that
> you
> > > have watermark generators only at the sources and the user supplies
> them.
> > > As a source author, you have no option to limit that. Here a bit of
> > > background:
> > >
> > > We observed that many users that read from Kafka were confused about no
> > > visible progress in their Flink applications because of some idle
> > partition
> > > and we introduced idleness subsequently. Idleness was always considered
> > as
> > > a means to achieve progress at the risk of losing a bit of correctness.
> > > So especially in the case that you describe with a Pulsar partition
> that
> > is
> > > empty but indefinitely active, the user needs to be able to use
> idleness
> > > such that downstream window operators progress.
> > >
> > > I hope to have clarified that "I wouldn't recommend using
> withIdleness()
> > > with source-based watermarks." would pretty much make the intended use
> > case
> > > not work anymore.
> > >
> > > ---
> > >
> > > Nevertheless, from the discussion with you and some offline discussion
> > with
> > > Piotr and Dawid, we actually found quite a bit of drawbacks from the
> > > current definition of idleness:
> > > - We currently only use idleness to exclude respective upstream tasks
> > from
> > > participating in watermark generation (as you have eloquently put
> further
> > > up in the thread).
> > > - However, the definition is bound to records. So while a partition is
> > > idle, no records should be produced.
> > > - That brings us into quite a few edge cases, where operators emit
> > records,
> > > while they are actually idling: Think of timers, asyncIO operators,
> > window
> > > operators based on timeouts, etc.
> > > - The solution would be to turn the operator active while emitting and
> > > returning to being idle afterwards (but when?). However, this has some
> > > unintended side-effects depending on when you switch back.
> > >
> > > We are currently thinking that we should rephrase the definition to
> what
> > > you described:
> > > - A channel that is active is providing watermarks.
> > > - An idle channel is not providing any watermarks but can deliver
> > records.
> > > - Then we are not talking about idle partitions anymore but explicit
> and
> > > implicit watermark generation and should probably rename the concepts.
> > > - This would probably mean that we also need an explicit markActive in
> > > source/sink to express that the respective entity now needs to wait for
> > > explicit watermarks.
> > >
> > > I'll open a proper discussion thread 

Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Till Rohrmann
Hi Tianxin,

thanks for starting this discussion. I am pulling in Arvid who works on
Flink's connectors.

I think the problem you are describing can happen.

>From what I understand you are proposing to keep track of the watermark of
processed file input splits and then filter out splits based on their
modification timestamps and the watermark. What is the benefit of keeping
for every split the modification timestamp in the map? Could it also work
if we sort the input splits according to their modification timestamps and
then remember the last processed split? That way we only remember a single
value and upon recovery, we only process those splits which have a newer
modification timestamp.

Cheers,
Till

On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao  wrote:

> Hi!
>
> Currently Flink File Source relies on a Set pathsAlreadyProcessed in
> SplitEnumerator to decide which file has been processed and avoids
> reprocessing files if a file is already in this set. However this set could
> be ever growing and ultimately exceed memory size if there are new files
> continuously added to the input path.
>
> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would
> like to be assigned to the ticket.
>
> Current proposed change as belows, would like to get an agreement on the
> approach taken.
>
>1.
>
>Maintain fileWatermark updated by new files modification time in
>ContinuousFileSplitEnumerator
>2.
>
>Change Set pathsAlreadyProcessed to a HashMap
>pathsAlreadyProcessed where the key is same as before which is the file
>path of already processed files, and the value is file modification
> time,
>expose getModificationTime() method to FileSourceSplit.
>
>
>1.
>
>Adding a fileExpireTime user configurable config, any files older
> than fileWatermark
>- fileExpireTime would get ignored.
>2.
>
>When snapshotting splitEnumerator, remove files that are older than
> fileWatermark
>- fileExpireTime from the pathsAlreadyProcessed map.
>3.
>
>Adding alreadyProcessedPaths map and fileWatermark to
>PendingSplitsCheckpoint, modify the current
>PendingSplitsCheckpointSerializer to add a version2 serializer that
>serialize the alreadyProcessedPaths map which included file modification
>time.
>4.
>
>Subclass of PendingSplitsCheckpoint like
>ContinuousHivePendingSplitsCheckpoint would not be impacted by
>initializing an empty alreadyProcessedMap and 0 as initial watermark.
>
> Thanks!
>


[jira] [Created] (FLINK-22920) Guava version conflict in flink-format module

2021-06-08 Thread sujun (Jira)
sujun created FLINK-22920:
-

 Summary: Guava version conflict in flink-format module
 Key: FLINK-22920
 URL: https://issues.apache.org/jira/browse/FLINK-22920
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ORC
Affects Versions: 1.14.0
Reporter: sujun


In the Flink-ORC and Flink-Parquet modules, The hadoop-common dependency 
contains the 11.0.2 version of guava, which conflicts with the 29.0-jre version 
required by the flink-table-planner-blink module. We should exclude guava from 
the hadoop-common dependency. Otherwise, running the unit test through the IDE 
throws a NoClassDefFoundError



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22919) Remove support for Hadoop1.x in HadoopInputFormatCommonBase.getCredentialsFromUGI

2021-06-08 Thread Junfan Zhang (Jira)
Junfan Zhang created FLINK-22919:


 Summary: Remove support for Hadoop1.x in 
HadoopInputFormatCommonBase.getCredentialsFromUGI
 Key: FLINK-22919
 URL: https://issues.apache.org/jira/browse/FLINK-22919
 Project: Flink
  Issue Type: Improvement
Reporter: Junfan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Feedback Collection Jira Bot

2021-06-08 Thread Till Rohrmann
I like this idea. It would then be the responsibility of the component
maintainers to manage the lifecycle explicitly.

Cheers,
Till

On Mon, Jun 7, 2021 at 1:48 PM Arvid Heise  wrote:

> One more idea for the bot. Could we have a label to exclude certain tickets
> from the life-cycle?
>
> I'm thinking about long-term tickets such as improving DataStream to
> eventually replace DataSet. We would collect ideas over the next couple of
> weeks without any visible progress on the implementation.
>
> On Fri, May 21, 2021 at 2:06 PM Konstantin Knauf 
> wrote:
>
> > Hi Timo,
> >
> > Thanks for joining the discussion. All rules except the unassigned rule
> do
> > not apply to Sub-Tasks actually (like deprioritization, closing).
> > Additionally, activity on a Sub-Taks counts as activity for the parent.
> So,
> > the parent ticket would not be touched by the bot as long as there is a
> > single Sub-Task that has a discussion or an update. If you experience
> > something different, this is a bug.
> >
> > Is there a reason why it is important to assign all Sub-Tasks to the same
> > person immediately? I am not sure if this kind "reserving tickets" is a
> > good idea in general to be honest.
> >
> > Cheers,
> >
> > Konstantin
> >
> >
> >
> >
> >
> > On Fri, May 21, 2021 at 12:00 PM Timo Walther 
> wrote:
> >
> > > Hi Konstantin,
> > >
> > > thanks for starting this discussion. I was also about to provide some
> > > feedback because I have the feeling that the bot is too aggressive at
> > > the moment.
> > >
> > > Even a 14 days interval is a short period of time for bigger efforts
> > > that might include several subtasks. Currently, if we split an issue
> > > into subtasks usually most subtasks are assigned to the same person.
> But
> > > the bot requires us to update all subtasks again after 7 days. Could we
> > > disable the bot for subtasks or extend the period to 30 days?
> > >
> > > The core problem in the past was that we had issues laying around
> > > untouched for years. Luckily, this is solved with the bot now. But
> going
> > > from years to 7 days spams the mail box quite a bit.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 21.05.21 09:22, Konstantin Knauf wrote:
> > > > Hi Robert,
> > > >
> > > > Could you elaborate on your comment on test instabilities? Would test
> > > > instabilities always get a fixVersion then?
> > > >
> > > > Background: Test instabilities are supposed to be Critical. Critical
> > > > tickets are deprioritized if they are unassigned and have not
> received
> > an
> > > > update for 14 days.
> > > >
> > > > Cheers,
> > > >
> > > > Konstantin
> > > >
> > > >
> > > >
> > > > On Thu, May 20, 2021 at 9:34 AM Robert Metzger 
> > > wrote:
> > > >
> > > >> +1
> > > >> This would also cover test instabilities, which I personally believe
> > > should
> > > >> not be auto-deprioritized until they've been analyzed.
> > > >>
> > > >> On Wed, May 19, 2021 at 1:46 PM Till Rohrmann  >
> > > >> wrote:
> > > >>
> > > >>> I like this idea. +1 for your proposal Konstantin.
> > > >>>
> > > >>> Cheers,
> > > >>> Till
> > > >>>
> > > >>> On Wed, May 19, 2021 at 1:30 PM Konstantin Knauf <
> > > >> konstan...@ververica.com
> > > 
> > > >>> wrote:
> > > >>>
> > >  Hi everyone,
> > > 
> > >  Till and I recently discussed whether we should disable the
> > >  "stale-blocker", "stale-critical", "stale-major" and "stale-minor"
> > > >> rules
> > >  for tickets that have a fixVersion set. This would allow people to
> > > plan
> > > >>> the
> > >  upcoming release without tickets being deprioritized by the bot
> > during
> > > >>> the
> > >  release cycle.
> > > 
> > >   From my point of view, this is a good idea as long as we can
> agree
> > to
> > > >> use
> > >  the "fixVersion" a bit more conservatively. What do I mean by
> that?
> > If
> > > >>> you
> > >  would categorize tickets planned for an upcoming release into:
> > > 
> > >  * Must Have
> > >  * Should Have
> > >  * Nice-To-Have
> > > 
> > >  only "Must Have" and "Should Have" tickets should get a
> fixVersion.
> > > >> From
> > > >>> my
> > >  observation, we currently often set the fixVersion if we just
> > wished a
> > >  feature was included in an upcoming release. Similarly, I often
> see
> > > >> bulk
> > >  changes of fixVersion that "roll over" many tickets to the next
> > > release
> > > >>> if
> > >  they have not made into the previous release although there is no
> > > >>> concrete
> > >  plan to fix them or they have even become obsolete by then.
> > Excluding
> > > >>> those
> > >  from the bot would be counterproductive.
> > > 
> > >  What do you think?
> > > 
> > >  Cheers,
> > > 
> > >  Konstantin
> > > 
> > > 
> > >  On Fri, Apr 23, 2021 at 2:25 PM Konstantin Knauf <
> kna...@apache.org
> > >
> > >  wrote:
> > > 
> > > > Hi everyone,
> > > >
> > > > After some offline 

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>current hash-based blocking shuffle implementation writes too many
> files
> >>concurrently which gives high pressure to the file system, for
> example,
> >>maintenance of too many file metas, exhaustion of inodes or file
> >>descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>based blocking shuffle don’t have the problem because for one result
> >>partition, only one file is written at the same time.
> >>2. *Performance*: Large amounts of small shuffle files and random IO
> >>can influence shuffle performance a lot especially for hdd (for ssd,
> >>sequential read is also important because of read ahead and cache).
> For
> >>batch jobs processing massive data, small amount of data per
> subpartition
> >>is common because of high parallelism. Besides, data skew is another
> cause
> >>of small subpartition files. By merging data of all subpartitions
> together
> >>in one file, more sequential read can be achieved.
> >>3. *Resource*: For current hash-based implementation, each
> >>subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>memory consumption can be huge. For example, we need at least 320M
> network
> >>memory per result partition if parallelism is set to 1 and
> because of
> >>the huge network consumption, it is hard to config the network
> memory for
> >>large scale batch job and  sometimes parallelism can not be
> increased just
> >>because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>


[DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-06-08 Thread Senhong Liu
Hi guys,

We would like to start a discussion on the new FLIP about rejecting
checkpoints on the operator level. The basic idea is to allow the operator
to reject a checkpoint when it is not under a proper situation and returning
a proper failure reason.

http://cwiki.apache.org/confluence/display/FLINK/FLIP-170+Adding+Checkpoint+Rejection+Mechanism

  

Looking forward to your participation!

Best,
Senhong



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-08 Thread Yangze Guo
@Yang
In short, the external resources will participate in resource
deduction and be logically ensured, but requesting an external
resource must still be done through config options with the current
default resource allocation strategy.
In FLIP-56, we abstract the logic of resource allocation to the
`ResourceAllocationStrategy`. Currently, with its default
implementation, ResourceManager would still allocate TMs with the same
resource spec and the external resources of it are configured through
the config option as well. So, in your case, you need to define the
"external-resources" and "external-resources.disk.amount". Then, all
the disk requirements defined in the SSG will be logically ensured, as
there is no slot level isolation. If the disk space of a task manager
cannot fulfill the disk requirement, RM will allocate a new one.
In the future, we'd like to introduce a `ResourceAllocationStrategy`
which allocates heterogeneous TMs according to the requirements. Then,
user only needs to define the driver of external resources when
needed.
Also, regarding the resource isolation, we may provide a fine-grained
mode in which each slot can only fetch the information of external
resources it requires in the future. But that is out of the scope of
this PR.

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 4:20 PM Yang Wang  wrote:
>
> Thanks @Yangze for preparing this FLIP.
>
> I think this is a good start point for the community users to have a taste
> on the fine-grained
> resource management, which we all believe it could improve the Flink job
> stability and
> cluster utilization.
>
> I have a simple question about the extended resources. It is possible to
> combine extended resources
> with fine-grained resource management. Except for the GPU, FPGA and other
> new computing devices,
> maybe the disk resource is a more general use case. For example, different
> SSG may have various
> disk requirements based on the state. So we need to allocate enough
> ephemeral storage resource for every
> TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> due to running out of limits.
>
>
> Best,
> Yang
>
>
> Xintong Song  于2021年6月8日周二 下午1:47写道:
>
> > I think being able to specify fine grained resource requirements without
> > having to change the codes and recompile the job is indeed a good idea. It
> > definitely improves the usability.
> >
> > However, this requires more careful designs, which probably deserves a
> > separate thread. I'd be good to have that discussion, but maybe not block
> > this feature on that.
> >
> > One idea concerning the configuration approach: As Yangze said, flink
> > configuration options are supposed to take effect at cluster level. For
> > updating job level specifics that are not suitable to be introduced as a
> > config option, currently the only way is to pass them as program arguments.
> > Would it make sense to introduce a general approach for overwriting such
> > job specifics without re-compiling the job?
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo  wrote:
> >
> > > @Wenlong
> > > After another consideration, the config option approach I mentioned
> > > above might not be appropriate. The resource requirements for SSG
> > > should be a job level configuration and should no be set in the
> > > flink-conf.
> > >
> > > I think we can define a JSON format, which would be the ResourceSpecs
> > > mapped by the name of SSGs, for the resource requirements of a
> > > specific job. Then, we allow user to configure the file path of that
> > > JSON. The JSON will be only parsed in runtime, which allows user to
> > > tune it without re-compiling the job.
> > >
> > > We can add another #setSlotSharingGroupResources for configuring the
> > > file path of that JSON:
> > > ```
> > > /**
> > >  * Specify fine-grained resource requirements for slot sharing groups
> > > with the given resource JSON file. The existing resource
> > >  * requirement of the same slot sharing group will be replaced.
> > >  */
> > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > > String pathToResourceJson);
> > > ```
> > >
> > > WDYT?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo  wrote:
> > > >
> > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > >
> > > > @Wenlong
> > > > I think that is a good idea, adjust the resource without re-compiling
> > > > the job will facilitate the tuning process.
> > > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > > (welcome any proposal for the prefix naming) for the resource spec
> > > > config of a slot sharing group. Then, user can set the ResourceSpec of
> > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > > heap: 100m, off-heap: 100m}". WDYT?
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl 
> > > wrote:
> > > > >
> > > > 

[jira] [Created] (FLINK-22918) StreamingFileSink does not commit partition when no message is sent between checkpoints

2021-06-08 Thread lihe ma (Jira)
lihe ma created FLINK-22918:
---

 Summary: StreamingFileSink does not commit partition when no 
message is sent between checkpoints
 Key: FLINK-22918
 URL: https://issues.apache.org/jira/browse/FLINK-22918
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.11.3
Reporter: lihe ma


FileSink extract the partitions from messages , then send partition list to 
committer. In  dynamic partition case, if one partition contains no data  
between two checkpoints. then the partition wont commit.

like this:
date=0101/hour=01/key=a/part1-file  (write at 01-01 01:20:00)

if there is no data with key=a between 01:20 and 02:10 (a time when watermark 
passed) , the committer will not receive this partition and it will never 
commit.
 

  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-08 Thread Yang Wang
Thanks @Yangze for preparing this FLIP.

I think this is a good start point for the community users to have a taste
on the fine-grained
resource management, which we all believe it could improve the Flink job
stability and
cluster utilization.

I have a simple question about the extended resources. It is possible to
combine extended resources
with fine-grained resource management. Except for the GPU, FPGA and other
new computing devices,
maybe the disk resource is a more general use case. For example, different
SSG may have various
disk requirements based on the state. So we need to allocate enough
ephemeral storage resource for every
TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
due to running out of limits.


Best,
Yang


Xintong Song  于2021年6月8日周二 下午1:47写道:

> I think being able to specify fine grained resource requirements without
> having to change the codes and recompile the job is indeed a good idea. It
> definitely improves the usability.
>
> However, this requires more careful designs, which probably deserves a
> separate thread. I'd be good to have that discussion, but maybe not block
> this feature on that.
>
> One idea concerning the configuration approach: As Yangze said, flink
> configuration options are supposed to take effect at cluster level. For
> updating job level specifics that are not suitable to be introduced as a
> config option, currently the only way is to pass them as program arguments.
> Would it make sense to introduce a general approach for overwriting such
> job specifics without re-compiling the job?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo  wrote:
>
> > @Wenlong
> > After another consideration, the config option approach I mentioned
> > above might not be appropriate. The resource requirements for SSG
> > should be a job level configuration and should no be set in the
> > flink-conf.
> >
> > I think we can define a JSON format, which would be the ResourceSpecs
> > mapped by the name of SSGs, for the resource requirements of a
> > specific job. Then, we allow user to configure the file path of that
> > JSON. The JSON will be only parsed in runtime, which allows user to
> > tune it without re-compiling the job.
> >
> > We can add another #setSlotSharingGroupResources for configuring the
> > file path of that JSON:
> > ```
> > /**
> >  * Specify fine-grained resource requirements for slot sharing groups
> > with the given resource JSON file. The existing resource
> >  * requirement of the same slot sharing group will be replaced.
> >  */
> > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > String pathToResourceJson);
> > ```
> >
> > WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo  wrote:
> > >
> > > Thanks for the feedbacks, Xintong and Wenlong!
> > >
> > > @Wenlong
> > > I think that is a good idea, adjust the resource without re-compiling
> > > the job will facilitate the tuning process.
> > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > (welcome any proposal for the prefix naming) for the resource spec
> > > config of a slot sharing group. Then, user can set the ResourceSpec of
> > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > heap: 100m, off-heap: 100m}". WDYT?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl 
> > wrote:
> > > >
> > > > Thanks Yangze for the flip, it is great for users to be able to
> > declare the
> > > > fine-grained resource requirements for the job.
> > > >
> > > > I have one minor suggestion: can we support setting resource
> > requirements
> > > > by configuration? Currently most of the config options in execution
> > config
> > > > can be configured by configuration, and it is very likely that users
> > need
> > > > to adjust the resource according to the performance of their job
> during
> > > > debugging,  Providing a configuration way will make it more
> convenient.
> > > >
> > > > Bests,
> > > > Wenlong Lyu
> > > >
> > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song 
> > wrote:
> > > >
> > > > > Thanks Yangze for preparing the FLIP.
> > > > >
> > > > > The proposed changes look good to me.
> > > > >
> > > > > As you've mentioned in the implementation plan, I believe one of
> the
> > most
> > > > > important tasks of this FLIP is to have the feature well
> documented.
> > It
> > > > > would be really nice if we can keep that in mind and start drafting
> > the
> > > > > documentation early.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo 
> > wrote:
> > > > >
> > > > > > Hi, there,
> > > > > >
> > > > > > We would like to start a discussion thread on "FLIP-169:
> DataStream
> > > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> > the
> > > > > > DataStream API for specifying fine-grained 

Re: Re: Add control mode for flink

2021-06-08 Thread 刘建刚
Thanks for the reply. It is a good question. There are multi choices as
follows:

   1. We can persist control signals in HighAvailabilityServices and replay
   them after failover.
   2. Only tell the users that the control signals take effect after they
   are checkpointed.


Steven Wu [via Apache Flink User Mailing List archive.] <
ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道:

>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]
> > wrote:
>
>> +1 on separating the effort into two steps:
>>
>>1. Introduce a common control flow framework, with flexible
>>interfaces for generating / reacting to control messages for various
>>purposes.
>>2. Features that leverating the control flow can be worked on
>>concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>1. Allow more time for opinions to be heard and potential use cases
>>to be collected
>>2. Draft a FLIP with the scope of common control flow framework
>>3. We probably need a poc implementation to make sure the framework
>>covers at least the following scenarios
>>   1. Produce control events from arbitrary operators
>>   2. Produce control events from JobMaster
>>   3. Consume control events from arbitrary operators downstream
>>   where the events are produced
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]
>> > wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> --
>>> Sender:kai wang<[hidden email]
>>> >
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG<[hidden email]
>>> >
>>> Cc:刘建刚<[hidden email]
>>> >; Xintong Song
>>> [via Apache Flink User Mailing List archive.]<[hidden email]
>>> >; user<[hidden
>>> email] >; dev<[hidden
>>> email] >
>>> Theme:Re: Add control mode for flink
>>>
>>>
>>>
>>> I'm big +1 for this feature.
>>>
>>>1. Limit the input qps.
>>>2. Change log level for debug.
>>>
>>> in my team, the two examples above are needed
>>>
>>> JING ZHANG <[hidden email]
>>> > 于2021年6月8日周二
>>> 上午11:18写道:
>>>
 Thanks Jiangang for bringing this up.
 As mentioned in Jiangang's email, `dynamic configuration framework`
 provides many useful functions in Kuaishou, because it could update job
 behavior without relaunching the job. The functions are very popular in
 Kuaishou, we also see similar demands in maillist [1].

 I'm big +1 for this feature.

 Thanks Xintong and Yun for deep thoughts about the issue. I like the
 idea about introducing control mode 

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-08 Thread Yangze Guo
@Xintong
> introduce a general approach for overwriting such job specifics without 
> re-compiling the job
I think that would be a good direction. Just share some cents on this
topic. I'd divide the job-level specifics into two categories:
- Specifics which affect how Flink executes the job, e.g.
"parallelism.default". Currently, most of these specifics have a
corresponding config option.
- Job-specific arguments, e.g. the "input" of our WordCount example.
Those could only be passes as program arguments.
It might be good to have a general approach for overwriting all the
above arguments. One preliminary idea is introducing a separate
"job-conf.yaml".

All in all, I agree that this topic requires more careful designs and
deserved a separate discussion thread.

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 1:47 PM Xintong Song  wrote:
>
> I think being able to specify fine grained resource requirements without
> having to change the codes and recompile the job is indeed a good idea. It
> definitely improves the usability.
>
> However, this requires more careful designs, which probably deserves a
> separate thread. I'd be good to have that discussion, but maybe not block
> this feature on that.
>
> One idea concerning the configuration approach: As Yangze said, flink
> configuration options are supposed to take effect at cluster level. For
> updating job level specifics that are not suitable to be introduced as a
> config option, currently the only way is to pass them as program arguments.
> Would it make sense to introduce a general approach for overwriting such
> job specifics without re-compiling the job?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo  wrote:
>
> > @Wenlong
> > After another consideration, the config option approach I mentioned
> > above might not be appropriate. The resource requirements for SSG
> > should be a job level configuration and should no be set in the
> > flink-conf.
> >
> > I think we can define a JSON format, which would be the ResourceSpecs
> > mapped by the name of SSGs, for the resource requirements of a
> > specific job. Then, we allow user to configure the file path of that
> > JSON. The JSON will be only parsed in runtime, which allows user to
> > tune it without re-compiling the job.
> >
> > We can add another #setSlotSharingGroupResources for configuring the
> > file path of that JSON:
> > ```
> > /**
> >  * Specify fine-grained resource requirements for slot sharing groups
> > with the given resource JSON file. The existing resource
> >  * requirement of the same slot sharing group will be replaced.
> >  */
> > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > String pathToResourceJson);
> > ```
> >
> > WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo  wrote:
> > >
> > > Thanks for the feedbacks, Xintong and Wenlong!
> > >
> > > @Wenlong
> > > I think that is a good idea, adjust the resource without re-compiling
> > > the job will facilitate the tuning process.
> > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > (welcome any proposal for the prefix naming) for the resource spec
> > > config of a slot sharing group. Then, user can set the ResourceSpec of
> > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > heap: 100m, off-heap: 100m}". WDYT?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl 
> > wrote:
> > > >
> > > > Thanks Yangze for the flip, it is great for users to be able to
> > declare the
> > > > fine-grained resource requirements for the job.
> > > >
> > > > I have one minor suggestion: can we support setting resource
> > requirements
> > > > by configuration? Currently most of the config options in execution
> > config
> > > > can be configured by configuration, and it is very likely that users
> > need
> > > > to adjust the resource according to the performance of their job during
> > > > debugging,  Providing a configuration way will make it more convenient.
> > > >
> > > > Bests,
> > > > Wenlong Lyu
> > > >
> > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song 
> > wrote:
> > > >
> > > > > Thanks Yangze for preparing the FLIP.
> > > > >
> > > > > The proposed changes look good to me.
> > > > >
> > > > > As you've mentioned in the implementation plan, I believe one of the
> > most
> > > > > important tasks of this FLIP is to have the feature well documented.
> > It
> > > > > would be really nice if we can keep that in mind and start drafting
> > the
> > > > > documentation early.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo 
> > wrote:
> > > > >
> > > > > > Hi, there,
> > > > > >
> > > > > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> > the
> > > > > > DataStream API 

[jira] [Created] (FLINK-22917) Dynamically change the log level of apache flink at runtime

2021-06-08 Thread pierrexiong (Jira)
pierrexiong created FLINK-22917:
---

 Summary: Dynamically change the log level of apache flink at 
runtime
 Key: FLINK-22917
 URL: https://issues.apache.org/jira/browse/FLINK-22917
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Configuration
Reporter: pierrexiong


In many business scenarios, developers need to dynamically change the level of 
the log to facilitate the rapid detection of bugs. Does the community have any 
plans about this function?

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-08 Thread Piotr Nowojski
Hi Eron,

The FLIP-167 is narrow, but we recently discovered some problems with
current idleness semantics as Arvid explained. We are planning to present a
new proposal to redefine them. Probably as a part of it, we would need to
rename them. Given that, I think it doesn't make sense to expose idleness
to the sinks before we rename and define it properly. In other words:

> 2. When the sink operator is idled, tell the sink function.

We shouldn't expose stream status as a part of public API until it's
properly defined.

I would propose one of the two things:
1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
Exposing idleness could be part of this next/future FLIP that would define
idleness in the first place.
2. Block FLIP-167, until the idleness is fixed.

I would vote for option number 1.

Piotrek

pon., 7 cze 2021 o 18:08 Eron Wright 
napisał(a):

> Piotr, David, and Arvid, we've had an expansive discussion but ultimately
> the proposal is narrow.  It is:
> 1. When a watermark arrives at the sink operator, tell the sink function.
> 2. When the sink operator is idled, tell the sink function.
>
> With these enhancements, we will significantly improve correctness in
> multi-stage flows, and facilitate an exciting project in the Pulsar
> community.  Would you please lend your support to FLIP-167 so that we can
> land this enhancement for 1.14?  My deepest thanks!
>
> -Eron
>
>
>
>
> On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:
>
> > Hi Eron,
> >
> > you either have very specific use cases in mind or have a misconception
> > about idleness in Flink with the new sources. The basic idea is that you
> > have watermark generators only at the sources and the user supplies them.
> > As a source author, you have no option to limit that. Here a bit of
> > background:
> >
> > We observed that many users that read from Kafka were confused about no
> > visible progress in their Flink applications because of some idle
> partition
> > and we introduced idleness subsequently. Idleness was always considered
> as
> > a means to achieve progress at the risk of losing a bit of correctness.
> > So especially in the case that you describe with a Pulsar partition that
> is
> > empty but indefinitely active, the user needs to be able to use idleness
> > such that downstream window operators progress.
> >
> > I hope to have clarified that "I wouldn't recommend using withIdleness()
> > with source-based watermarks." would pretty much make the intended use
> case
> > not work anymore.
> >
> > ---
> >
> > Nevertheless, from the discussion with you and some offline discussion
> with
> > Piotr and Dawid, we actually found quite a bit of drawbacks from the
> > current definition of idleness:
> > - We currently only use idleness to exclude respective upstream tasks
> from
> > participating in watermark generation (as you have eloquently put further
> > up in the thread).
> > - However, the definition is bound to records. So while a partition is
> > idle, no records should be produced.
> > - That brings us into quite a few edge cases, where operators emit
> records,
> > while they are actually idling: Think of timers, asyncIO operators,
> window
> > operators based on timeouts, etc.
> > - The solution would be to turn the operator active while emitting and
> > returning to being idle afterwards (but when?). However, this has some
> > unintended side-effects depending on when you switch back.
> >
> > We are currently thinking that we should rephrase the definition to what
> > you described:
> > - A channel that is active is providing watermarks.
> > - An idle channel is not providing any watermarks but can deliver
> records.
> > - Then we are not talking about idle partitions anymore but explicit and
> > implicit watermark generation and should probably rename the concepts.
> > - This would probably mean that we also need an explicit markActive in
> > source/sink to express that the respective entity now needs to wait for
> > explicit watermarks.
> >
> > I'll open a proper discussion thread tomorrow.
> >
> > Note that we probably shouldn't rush this FLIP until we have clarified
> the
> > semantics of idleness. We could also cut the scope of the FLIP to exclude
> > idleness and go ahead without it (there should be enough binding votes
> > already).
> >
> > On Sat, Jun 5, 2021 at 12:09 AM Eron Wright  > .invalid>
> > wrote:
> >
> > > I understand your scenario but I disagree with its assumptions:
> > >
> > > "However, the partition of A is empty and thus A is temporarily idle."
> -
> > > you're assuming that the behavior of the source is to mark itself idle
> if
> > > data isn't available, but that's clearly source-specific and not
> behavior
> > > we expect to have in Pulsar source.  A partition may be empty
> > indefinitely
> > > while still being active.  Imagine that the producer is defending a
> > lease -
> > > "I'm here, there's no data, please don't advance the clock".
> > >
> > > "we bind idleness to wall 

[jira] [Created] (FLINK-22916) Revisit and close JIRA issues around legacy planner

2021-06-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-22916:


 Summary: Revisit and close JIRA issues around legacy planner
 Key: FLINK-22916
 URL: https://issues.apache.org/jira/browse/FLINK-22916
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


We should review which legacy planner issues should be migrated to the Blink 
planner and which ones can simply be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22915) Extend Flink ML API to support Estimator/Transformer DAG

2021-06-08 Thread Dong Lin (Jira)
Dong Lin created FLINK-22915:


 Summary: Extend Flink ML API to support Estimator/Transformer DAG
 Key: FLINK-22915
 URL: https://issues.apache.org/jira/browse/FLINK-22915
 Project: Flink
  Issue Type: Improvement
Reporter: Dong Lin


Currently Flink ML API allows users to compose an Estimator/Transformer from a 
pipeline (i.e. linear sequence) of Estimator/Transformer. We propose to extend 
the Flink ML API so that users can compose an Estimator/Transformer from a 
directed-acyclic-graph (i.e. DAG) of Estimator/Transformer. 

This feature is useful for the following use-cases:

1) The preprocessing workflow (shared between training and inference workflows) 
may involve the join of multiple tables, where the join of two tables can be 
expressed as a Transformer of 2 inputs and 1 output. And the preprocessing 
workflow could also involve the spilt operation, where the split operation has 
1 input (e.g. the original table) and 2 outputs (e.g. the split of the original 
table).

The expression of preprocessing workflow involving the join/split operation 
needs to be expressed as a DAG of Transformer.

2) The graph-embedding algorithm can be expressed as an Estimator, where the 
Estimator takes as input two tables (e.g. a node table and an edge table). The 
corresponding Transformer has 1 input (i.e. the node) and 1 output (i.e. the 
node after embedding)

The expression of training workflow involving the graph-embedding Estimator 
needs to be expressed as a DAG of Transformer/Estimator.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22914) Use Kafka New Source in Table/SQL connector

2021-06-08 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-22914:
-

 Summary: Use Kafka New Source in Table/SQL connector
 Key: FLINK-22914
 URL: https://issues.apache.org/jira/browse/FLINK-22914
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Reporter: Qingsheng Ren
 Fix For: 1.14.0


Currently the Kafka Table/SQL connector is still using the legacy Kafka 
SourceFunction. In order to align DataStream and Table/SQL API, the new Kafka 
source should also be used in Table/SQL connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-06-08 Thread Steven Wu
> hybrid sounds to me more like the source would constantly switch back and
forth

Initially, the focus of hybrid source is more like a sequenced chain.

But in the future it would be cool that hybrid sources can intelligently
switch back and forth between historical data source (like Iceberg) and
live data source (like Kafka). E.g.,
- if the Flink job is lagging behind Kafka retention, automatically switch
to Iceberg source
- once job caught up, switch back to Kafka source

That can simplify operational aspects of manually switching.


On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise  wrote:

> Sorry for joining the party so late, but it's such an interesting FLIP with
> a huge impact that I wanted to add my 2 cents. [1]
> I'm mirroring some basic question from the PR review to this thread because
> it's about the name:
>
> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> similar.
> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
> does not carry the concatentation concept as well (hybrid sounds to me more
> like the source would constantly switch back and forth).
>
> Could we take a few minutes to think if this is the most intuitive name for
> new users? I'm especially hoping that natives might give some ideas (or
> declare that Hybrid is perfect).
>
> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>
> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu  wrote:
>
> > > Converter function relies on the specific enumerator capabilities to
> set
> > the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >
> > I guess the premise is that a converter is for a specific tuple of
> > (upstream source, downstream source) . We don't have to define generic
> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >
> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> > probably promoting uniformity across sources that support
> hybrid/switchable
> > source.
> >
> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise  wrote:
> >
> > > Hi Steven,
> > >
> > > Thank you for the thorough review of the PR and for bringing this back
> > > to the mailing list.
> > >
> > > All,
> > >
> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > > deviates from the original proposal [1]. The goal would be to update
> > > the FLIP soon and bring it to a vote, as previously suggested offline
> > > by Nicholas.
> > >
> > > A few minor issues in the PR are outstanding and I'm working on test
> > > coverage for the recovery behavior, which should be completed soon.
> > >
> > > The dynamic position transfer needs to be concluded before we can move
> > > forward however.
> > >
> > > There have been various ideas, including the special
> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > > an enumerator interface extension to extract the end state.
> > >
> > > One goal in the FLIP is to "Reuse the existing Source connectors built
> > > with FLIP-27 without any change." and I think it is important to honor
> > > that goal given that fixed start positions do not require interface
> > > changes.
> > >
> > > Based on the feedback the following might be a good solution for
> > > runtime position transfer:
> > >
> > > * User supplies the optional converter function (not applicable for
> > > fixed positions).
> > > * Instead of relying on the enumerator checkpoint state [2], the
> > > converter function will be supplied with the current and next
> > > enumerator (source.createEnumerator).
> > > * Converter function relies on the specific enumerator capabilities to
> > > set the new start position (e.g.
> > > fileSourceEnumerator.getEndTimestamp() and
> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >
> > > With this approach, there is no need to augment FLIP-27 interfaces and
> > > custom source capabilities are easier to integrate. Removing the
> > > mandate to rely on enumerator checkpoint state also avoids potential
> > > upgrade/compatibility issues.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > > [2]
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >
> > >
> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu  wrote:
> > > >
> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > > missed anything.
> > > >
> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > converter.
> > > > * Current PR uses the enumerator checkpoint state type as the input
> for
> > > the
> > > > converter
> > > > * FLIP-150 defines a new EndStateT 

Re: Re: Add control mode for flink

2021-06-08 Thread Steven Wu
I can see the benefits of control flow. E.g., it might help the old (and
inactive) FLIP-17 side input. I would suggest that we add more details of
some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic
config is typically targeted/loaded by one specific operator. Control flow
will propagate the dynamic config to all operators. not a problem per se

Regarding using the REST api (to jobmanager) for accepting control
signals from external system, where are we going to persist/checkpoint the
signal? jobmanager can die before the control signal is propagated and
checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song  wrote:

> +1 on separating the effort into two steps:
>
>1. Introduce a common control flow framework, with flexible interfaces
>for generating / reacting to control messages for various purposes.
>2. Features that leverating the control flow can be worked on
>concurrently
>
> Meantime, keeping collecting potential features that may leverage the
> control flow should be helpful. It provides good inputs for the control
> flow framework design, to make the framework common enough to cover the
> potential use cases.
>
> My suggestions on the next steps:
>
>1. Allow more time for opinions to be heard and potential use cases to
>be collected
>2. Draft a FLIP with the scope of common control flow framework
>3. We probably need a poc implementation to make sure the framework
>covers at least the following scenarios
>   1. Produce control events from arbitrary operators
>   2. Produce control events from JobMaster
>   3. Consume control events from arbitrary operators downstream where
>   the events are produced
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>
>> Very thanks Jiangang for bringing this up and very thanks for the
>> discussion!
>>
>> I also agree with the summarization by Xintong and Jing that control flow
>> seems to be
>> a common buidling block for many functionalities and dynamic
>> configuration framework
>> is a representative application that frequently required by users.
>> Regarding the control flow,
>> currently we are also considering the design of iteration for the
>> flink-ml, and as Xintong has pointed
>> out, it also required the control flow in cases like detection global
>> termination inside the iteration
>>  (in this case we need to broadcast an event through the iteration body
>> to detect if there are still
>> records reside in the iteration body). And regarding  whether to
>> implement the dynamic configuration
>> framework, I also agree with Xintong that the consistency guarantee would
>> be a point to consider, we
>> might consider if we need to ensure every operator could receive the
>> dynamic configuration.
>>
>> Best,
>> Yun
>>
>>
>>
>> --
>> Sender:kai wang
>> Date:2021/06/08 11:52:12
>> Recipient:JING ZHANG
>> Cc:刘建刚; Xintong Song [via Apache Flink User
>> Mailing List archive.]; user<
>> u...@flink.apache.org>; dev
>> Theme:Re: Add control mode for flink
>>
>>
>>
>> I'm big +1 for this feature.
>>
>>1. Limit the input qps.
>>2. Change log level for debug.
>>
>> in my team, the two examples above are needed
>>
>> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>>
>>> Thanks Jiangang for bringing this up.
>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>> provides many useful functions in Kuaishou, because it could update job
>>> behavior without relaunching the job. The functions are very popular in
>>> Kuaishou, we also see similar demands in maillist [1].
>>>
>>> I'm big +1 for this feature.
>>>
>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>> idea about introducing control mode in Flink.
>>> It takes the original issue a big step closer to essence which also
>>> provides the possibility for more fantastic features as mentioned in
>>> Xintong and Jark's response.
>>> Based on the idea, there are at least two milestones to achieve the
>>> goals which were proposed by Jiangang:
>>> (1) Build a common control flow framework in Flink.
>>>  It focuses on control flow propagation. And, how to integrate the
>>> common control flow framework with existing mechanisms.
>>> (2) Builds a dynamic configuration framework which is exposed to users
>>> directly.
>>>  We could see dynamic configuration framework is a top application
>>> on the underlying control flow framework.
>>>  It focuses on the Public API which receives configuration updating
>>> requests from users. Besides, it is necessary to introduce an API
>>> protection mechanism to avoid job performance degradation caused by too
>>> many control events.
>>>
>>> I suggest splitting the whole design into two after we reach a consensus
>>> on whether to introduce this feature because these two 

Re: Re: Add control mode for flink

2021-06-08 Thread Xintong Song
+1 on separating the effort into two steps:

   1. Introduce a common control flow framework, with flexible interfaces
   for generating / reacting to control messages for various purposes.
   2. Features that leverating the control flow can be worked on
   concurrently

Meantime, keeping collecting potential features that may leverage the
control flow should be helpful. It provides good inputs for the control
flow framework design, to make the framework common enough to cover the
potential use cases.

My suggestions on the next steps:

   1. Allow more time for opinions to be heard and potential use cases to
   be collected
   2. Draft a FLIP with the scope of common control flow framework
   3. We probably need a poc implementation to make sure the framework
   covers at least the following scenarios
  1. Produce control events from arbitrary operators
  2. Produce control events from JobMaster
  3. Consume control events from arbitrary operators downstream where
  the events are produced


Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:

> Very thanks Jiangang for bringing this up and very thanks for the
> discussion!
>
> I also agree with the summarization by Xintong and Jing that control flow
> seems to be
> a common buidling block for many functionalities and dynamic configuration
> framework
> is a representative application that frequently required by users.
> Regarding the control flow,
> currently we are also considering the design of iteration for the
> flink-ml, and as Xintong has pointed
> out, it also required the control flow in cases like detection global
> termination inside the iteration
>  (in this case we need to broadcast an event through the iteration body
> to detect if there are still
> records reside in the iteration body). And regarding  whether to implement
> the dynamic configuration
> framework, I also agree with Xintong that the consistency guarantee would
> be a point to consider, we
> might consider if we need to ensure every operator could receive the
> dynamic configuration.
>
> Best,
> Yun
>
>
>
> --
> Sender:kai wang
> Date:2021/06/08 11:52:12
> Recipient:JING ZHANG
> Cc:刘建刚; Xintong Song [via Apache Flink User
> Mailing List archive.]; user<
> u...@flink.apache.org>; dev
> Theme:Re: Add control mode for flink
>
>
>
> I'm big +1 for this feature.
>
>1. Limit the input qps.
>2. Change log level for debug.
>
> in my team, the two examples above are needed
>
> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>
>> Thanks Jiangang for bringing this up.
>> As mentioned in Jiangang's email, `dynamic configuration framework`
>> provides many useful functions in Kuaishou, because it could update job
>> behavior without relaunching the job. The functions are very popular in
>> Kuaishou, we also see similar demands in maillist [1].
>>
>> I'm big +1 for this feature.
>>
>> Thanks Xintong and Yun for deep thoughts about the issue. I like the idea
>> about introducing control mode in Flink.
>> It takes the original issue a big step closer to essence which also
>> provides the possibility for more fantastic features as mentioned in
>> Xintong and Jark's response.
>> Based on the idea, there are at least two milestones to achieve the goals
>> which were proposed by Jiangang:
>> (1) Build a common control flow framework in Flink.
>>  It focuses on control flow propagation. And, how to integrate the
>> common control flow framework with existing mechanisms.
>> (2) Builds a dynamic configuration framework which is exposed to users
>> directly.
>>  We could see dynamic configuration framework is a top application on
>> the underlying control flow framework.
>>  It focuses on the Public API which receives configuration updating
>> requests from users. Besides, it is necessary to introduce an API
>> protection mechanism to avoid job performance degradation caused by too
>> many control events.
>>
>> I suggest splitting the whole design into two after we reach a consensus
>> on whether to introduce this feature because these two sub-topic all need
>> careful design.
>>
>>
>> [
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>> ]
>>
>> Best regards,
>> JING ZHANG
>>
>> 刘建刚  于2021年6月8日周二 上午10:01写道:
>>
>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>> long-running, it is similar to many services. So interacting with it or
>>> controlling it is a common desire. This was our initial thought when
>>> implementing the feature. In our inner flink, many configs used in yaml can
>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>
>>>1. Limit the input qps.
>>>2. Degrade the job by sampling and so on.
>>>3. Reset kafka offset in certain cases.
>>>4. Stop checkpoint in certain cases.
>>>5. Control the history consuming.
>>>6. 

[jira] [Created] (FLINK-22913) Support Python UDF chaining in Python DataStream API

2021-06-08 Thread Dian Fu (Jira)
Dian Fu created FLINK-22913:
---

 Summary: Support Python UDF chaining in Python DataStream API
 Key: FLINK-22913
 URL: https://issues.apache.org/jira/browse/FLINK-22913
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0


Currently, for the following job:
{code}
ds = ..
ds.map(map_func1)
    .map(map_func2)
{code}

The Python function `map_func1` and `map_func2` will runs in separate Python 
workers and the result of `map_func1` will be transferred to JVM and then 
transferred to `map_func2` which may resides in another Python worker. This 
introduces redundant communication and serialization/deserialization overhead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)