[jira] [Created] (FLINK-24713) Postpone resourceManager serving after the recovery phase has finished

2021-10-31 Thread Aitozi (Jira)
Aitozi created FLINK-24713:
--

 Summary: Postpone resourceManager serving after the recovery phase 
has finished
 Key: FLINK-24713
 URL: https://issues.apache.org/jira/browse/FLINK-24713
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.14.1
Reporter: Aitozi


When ResourceManager started, JobManger will connect to the ResourceManager, 
this means the ResourceManage will begin to try serve the resource requests 
from SlotManager.

If ResourceManager failover, although it will try to recover the pod / 
container from previous attempt, But new resource requirements may happen 
before the old taskManger register to slotManager. 

In this case, it may double the required taskManager when jobManager failover. 
We may need a mechanism to postpone resourceManager serving after the recovery 
phase has finished



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24712) NFATest#testSimpleNFA: fix test flaxiness

2021-10-31 Thread Alexander Ronald Altman (Jira)
Alexander Ronald Altman created FLINK-24712:
---

 Summary: NFATest#testSimpleNFA:  fix test flaxiness
 Key: FLINK-24712
 URL: https://issues.apache.org/jira/browse/FLINK-24712
 Project: Flink
  Issue Type: Improvement
  Components: Library / CEP
Reporter: Alexander Ronald Altman


{{org.apache.flink.cep.nfa.NFATest#testSimpleNFA}} was flaky under 
[NonDex|https://github.com/TestingResearchIllinois/NonDex] due to the outer 
order of event patterns being unconstrained. This test sorts them by start-id, 
which is unique in this particular case, as a quick way to ensure robust 
comparison.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24711) Typed ID support

2021-10-31 Thread Hady Willi (Jira)
Hady Willi created FLINK-24711:
--

 Summary: Typed ID support
 Key: FLINK-24711
 URL: https://issues.apache.org/jira/browse/FLINK-24711
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Hady Willi


Hi everyone,

Typically in my org we use protobuf for both key and payloads in Kafka. Is 
there any plan to support types in message id? Currently the UTF_8 to bytes and 
new string from UTF_8 doesn't transfer the bytes correctly.

Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24710) Kafka header support for ingress and egress

2021-10-31 Thread Hady Willi (Jira)
Hady Willi created FLINK-24710:
--

 Summary: Kafka header support for ingress and egress
 Key: FLINK-24710
 URL: https://issues.apache.org/jira/browse/FLINK-24710
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Hady Willi


Hi everyone, 

In my org we use Kafka headers as metadata store. I understand that this 
pattern can also be achieved by a map in payload type, but I want to get a 
sense on how statefun community thinks of this feature.

Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-31 Thread Kurt Young
Hi Till,

We have discussed the possibility of putting this FLIP into another
repository offline
with Stephan and Timo. This looks similar with another under going effort
which trying
to put all connectors outside the Flink core repository.

>From the motivation and scope of this FLIP, it's quite different from
current connectors in
some aspects. What we are trying to offer is some kind of built-in storage,
or we can call it
internal/managed tables, compared with current connectors, they kind of
express external
tables of Flink SQL. Functionality-wise, this managed table would have more
ability than
all these connectors, since we controlled the implementation of such
storage. Thus this table
storage will interact with lots SQL components, like metadata handling,
optimization, execution,
etc.

However we do see some potential benefits if we choose to put it outside
Flink:
- We may achieve more rapid development speed and maybe more frequent
release.
- Force us to think really clearly about the interfaces it should be,
because we don't have
the shortcut to modify core & connector codes all at the same time.

But we also can't ignore the overhead:
- We almost need everything that is discussed in the splitting connectors
thread.
- We have to create lots more interface than TableSource/TableSink to make
it just work at the first
place, e.g. interfaces to express such tables should be managed by Flink,
interfaces to express the
physical capability of the storage then it can be bridged to SQL optimizer
and executor.
- If we create lots of interfaces with only one implementation, that sounds
overengineering to me.

Combining the pros and cons above, what we are trying to do is firstly
implement it in a feature branch,
and also keep good engineering and design in mind. At some point we
re-evaluate the decision whether
to put it inside or outside the Flink core. What do you think?

Best,
Kurt


On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann  wrote:

> Hi Jingsong,
>
> Thanks for creating this FLIP. I don't have a lot to add because I am not
> very familiar with the SQL components. While reading the FLIP I was
> wondering what would we need in Flink to build something like the BDT
> feature outside of Flink as a kind of extension? Would something like this
> be possible? Maybe the answer is a quick no ;-)
>
> Cheers,
> Till
>
> On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > I updated FLIP based on your feedback:
> >
> > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> > TableDescriptor.forManaged
> >
> > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
> >
> > 3. Add description to miss dropped table.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li 
> > wrote:
> > >
> > > Hi Ingo,
> > >
> > > Really appreciate your feedback.
> > >
> > > #1. The reason why we insist on using no "connector" option is that we
> > > want to bring the following design to users:
> > > - With the "connector" option, it is a mapping, unmanaged table.
> > > - Without the "connector" option, it is a managed table. It may be an
> > > Iceberg managed table, or may be a JDBC managed table, or may be a
> > > Flink managed table.
> > >
> > > #2. About:
> > > CREATE TABLE T (f0 INT);
> > > ALTER TABLE T SET ('connector' = '…');
> > >
> > > I think it is dangerous, even for a generic table. The managed table
> > > should prohibit it.
> > >
> > > #3. DDL and Table API
> > >
> > > You are right, Table Api should be a superset of SQL. There is no
> > > doubt that it should support BDT.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk  wrote:
> > > >
> > > > Hi Jingsong,
> > > >
> > > > thanks again for the answers. I think requiring catalogs to implement
> > an
> > > > interface to support BDTs is something we'll need (though personally
> I
> > > > still prefer explicit DDL here over the "no connector option"
> > approach).
> > > >
> > > > What about more edge cases like
> > > >
> > > > CREATE TABLE T (f0 INT);
> > > > ALTER TABLE T SET ('connector' = '…');
> > > >
> > > > This would have to first create the physical storage and then delete
> it
> > > > again, right?
> > > >
> > > > On a separate note, he FLIP currently only discusses SQL DDL, and you
> > have
> > > > also mentioned
> > > >
> > > > > BDT only can be dropped by Flink SQL DDL now.
> > > >
> > > > Something Flink suffers from a lot is inconsistencies across APIs. I
> > think
> > > > it is important that we support features on all major APIs, i.e.
> > including
> > > > Table API.
> > > > For example for creating a BDT this would mean e.g. adding something
> > like
> > > > #forManaged(…) to TableDescriptor.
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li 
> > wrote:
> > > >
> > > > > Hi Ingo,
> > > > >
> > > > > I thought again.
> > > > >
> > > > > I'll try to sort out the current catalog behaviors.
> > > 

[jira] [Created] (FLINK-24709) Fix the issue of interval join java case content in the official document case

2021-10-31 Thread XiaShengSheng (Jira)
XiaShengSheng created FLINK-24709:
-

 Summary: Fix the issue of interval join java case content in the 
official document case
 Key: FLINK-24709
 URL: https://issues.apache.org/jira/browse/FLINK-24709
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.11.0, 1.10.0, 1.9.0, 1.8.0
Reporter: XiaShengSheng
 Attachments: case.png

Fix the interval join java case in the official document case:

Take the flink1.12.0 version document link as an example:
[https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join|http://example.com]

1、Your case is:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream orangeStream = ...
DataStream greenStream = ...

orangeStream
.keyBy()
.intervalJoin(greenStream.keyBy())
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction out) {
out.collect(first + "," + second);
}
});


2、After repair:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream orangeStream = ...
DataStream greenStream = ...

orangeStream
.keyBy()
.intervalJoin(greenStream.keyBy())
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction out) {
out.collect(left + "," + right);
}
});



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-187: Adaptive Batch Job Scheduler

2021-10-31 Thread Lijie Wang
Hi, Till & Zhu

Thanks for your feedback. Also thanks for your comments and suggestions

on wiki, which are very helpful for perfecting the FLIP.


I also agree to provide our users with consistent and easy-to-understand

deployment options. Regarding the three options proposed by Till, my
opinion

is the same as Zhu's. In the first version, we can only support "option1", and
then the

"option2" and "option3" can be the future improvements.

Regarding the side note to abstract subpartitions as splits,

although it is not our original intention, I personally feel it's
meaningful.

This is also helpful to users, users can use it to do some monitoring work,

in order to get the progress of jobs in detail.

Best,

Lijie

Zhu Zhu  于2021年10月30日周六 下午3:47写道:

> Hi Till,
>
> Thanks for the comments!
>
> I agree with you that we should avoid an auto-scaled job not able to be
> scheduled
> in standalone/reactive mode. And I think it's great if we can expose a
> deployment
> option that is consistent for streaming and batch jobs, which can be easier
> to
> understand. Just looking to the day to make both adaptive schedulers
> default, so
> that most users do not need to care about job tuning while the job can run
> well.
>
> Regarding the three options, personally I prefer to take *#1* as the first
> step, to
> limit the scope of this FLIP a bit, otherwise it may be too complicated.
> I think *#3* is the final goal we need to target later, so that mixed
> bounded and
> unbounded workloads can be supported. Given that there can be multiple
> stages scheduled at the same time, the design of the scheduling may not be
> very straightforward and needs some thorough consideration.
> *#2* can be a very good improvement itself. Shuffles of batch jobs can be
> auto-determined to be pipelined or blocking according to available
> resources.
> But the changes may involve many components and can be large. So I think
> it can be a standalone future improvement.
>
> Regarding the side note to abstract subpartitions as splits, the idea is
> very
> interesting to me. Besides supporting auto scaling, I think trackable
> produced
> splits can also help in troubleshooting and give some insights for future
> improvements. Collecting data sizes for batch adaptive scheduler can be the
> first step and we can further consider the abstraction of it.
>
> Thanks,
> Zhu
>
> Till Rohrmann  于2021年10月29日周五 下午10:47写道:
>
> > Hi Lijie,
> >
> > Thanks for drafting this FLIP together with Zhu Zhu :-)
> >
> > I like the idea of making the parallelism of operators of a bounded job
> > dependent on the data size. This makes the job adjust automatically when
> > the data sources/sizes change.
> >
> > I can see this work well in combination with the active mode where Flink
> > can ask for more resources.
> >
> > In the case of the standalone mode, I think it can lead to situations
> where
> > one and the same job can be scheduled or not depending on the input data.
> > The problem is pipelined regions that contain more than a single operator
> > instance (e.g. pipelined shuffles). We already have this problem when
> > submitting a batch job with too high parallelism onto a standalone
> cluster.
> > However, with the adaptive batch mode this problem might become a bit
> more
> > present. So my question would be how can we solve this problem
> (potentially
> > in a follow up step). I could think of the following three alternatives
> > atm:
> >
> > 1. Only allow blocking data exchanges: This will limit the size of a
> > pipelined region to a single operator instance. This has the downside
> that
> > we no longer support pipelined execution of multiple operators (other
> than
> > chained). Moreover, it requires the user to set all data exchanges to
> > blocking which cannot be enforced atm.
> > 2. Introduce a new pipelined-blocking data exchange hybrid that supports
> > pipelined data exchanges but can also spill to disk if there is no
> > consumer: This could allow to still make progress in case that one has a
> > pipelined region which requires more slots than what we currently have.
> > 3. Decide on the actual parallelism of a pipelined region after having
> > received the slots that are declared based on the data size per subtask.
> If
> > the pipelined region contains an all-to-all connection, then the
> > parallelism is how many slots we currently have. If not, then the
> > parallelism can be decided by the data volume: This would effectively
> mean
> > to enable the existing AdaptiveScheduler to also run batch workloads.
> >
> > With either of these options, I believe that we could provide a somewhat
> > consistent behaviour across the different deployment and execution modes
> > wrt to scaling:
> >
> > a) Active + streaming job that uses AdaptiveScheduler: Can run with fewer
> > slots than requested. Can ask for more slots. Once new slots arrive it
> will
> > make use of it.
> > b) Reactive + streaming job that uses AdaptiveScheduler: Can run with
>