Re: [ANNOUNCE] New Apache Flink Committer - Congxian Qiu

2020-10-28 Thread Jark Wu
Congrats Congxian!

Best,
Jark

On Thu, 29 Oct 2020 at 14:28, Yu Li  wrote:

> Hi all,
>
> On behalf of the PMC, I’m very happy to announce Congxian Qiu as a new
> Flink committer.
>
> Congxian has been an active contributor for more than two years, with 226
> contributions including 76 commits and many PR reviews.
>
> Congxian mainly works on state backend and checkpoint modules, meantime is
> one of the main maintainers of our Chinese document translation.
>
> Besides his work on the code, he has been driving initiatives on dev@
> list,
> supporting users and giving talks at conferences.
>
> Please join me in congratulating Congxian for becoming a Flink committer!
>
> Cheers,
> Yu
>


Re: Could someone please take a look at mr PR, thanks

2020-11-02 Thread Jark Wu
Sorry for the late reviewing. The community is busy preparing for 1.12
release.
I guess this is the reason that this PR was missed to review.
I will help to review this PR later.

Thanks for the contribution!

Best,
Jark

On Tue, 3 Nov 2020 at 10:36, JiaTao Tao  wrote:

> Here I fix a potential NPE and contributed my patch, but no one reviewed
> after several months (https://github.com/apache/flink/pull/12702), this PR
> has conflicts, so I pull a new one (
> https://github.com/apache/flink/pull/13836), would someone be nice to take
> a look at this?
>
>
> Regards!
>
> Aron Tao
>


Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued function

2020-11-08 Thread Jark Wu
Hi all,

After some offline discussion and investigation with Timo and Danny, I have
updated the FLIP-145.

FLIP-145:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function

Here are the updates:
1. Add SESSION window syntax and examples.
2. Time Attribute: the returned value of window TVF will return 3 columns
now with additional window_time
which is a time attribute. Add a section of "Time Attribute Propagate"
to explain how to propagate time attributes and examples.
3. The old window syntax will be deprecated. We may drop the old syntax in
the future but that needs another discussion.
4.  Add future work about simplifying TABLE() keyword (we already started
discussion in Calcite [1]) and supporting COUNT window.

Besides, we also investigated whether it is possible to use a nested type
"window(start, end, time)" instead of 3 columns.
However, there are some problems that are not possible for now.
- `window.start` can’t be selected in the group by query, because it is not
grouped.
   Postgres supports selecting nested fields for grouped ROW columns. We
can fix this in Calcite, but this isn't a trivial work.
- WINDOW is a token in the parser, can’t be used as a column name.
Otherwise, the parsing for OVER WINDOW will fail.
- Apache Beam also considered to put wstart and wend in a separate nested
row [2]. However, that would limit these extensions
  to engines supporting nested rows. Many systems don't support nested rows
well.

Therefore, we still insist on using three fields.

I would like to start a new VOTE for the updated FLIP-145 if there are no
objections.

Best,
Jark

[1]:
https://lists.apache.org/x/thread.html/ra98db08e280ddd9adeef62f456f61aedfdf7756e215cb4d66e2a52c9@%3Cdev.calcite.apache.org%3E
[2]:
https://docs.google.com/document/d/138uA7VTpbF84CFrd--cz3YVe0-AQ9ALnsavaSE2JeE4/edit?disco=HJ0EnGI


On Thu, 15 Oct 2020 at 21:03, Danny Chan  wrote:

> Hi, Timo ~
>
> > We are not forced by
> the standard to do it as stated in the `One SQL to Rule it all` paper
>
> No, slide to the SQL standard is always better, i think this is a common
> routine of our Flink SQL now, without a standard, everyone can give a
> preference and the discussion would easily go too far apart.
>
> > We can align the SQL windows more towards our regular DataStream API
> windows, where you keyBy first and then apply a window operator.
>
> I don't think current DataStream window join implement the window
> semantics correctly, it joins the data set first then windowing the LHS and
> RHS data together, actually each input should window its data set
> separately.
>
> As for the "key by data set first", current window TVF appends just window
> attributes and thus it is very light-weight and orthorhombic, we can
> combine the window TVFs with additional joins, aggregations, TopN and so on.
>
> In SQL, people usually describe the "KEY BY" with "GROUP BY" caluse, that
> means we bind strongly the window TVF and aggregate operator together which
> i would definitely vote a -1.
>
> As for the PARTTION BY, there are specific cases for the "SESSION" window
> because a session often has a logic key there, we can extend the PARTTION
> BY syntax because it is already in the SQL standard, i'm confused why a
> Tumble window has a PARTITION key there ? What is the real use case ?
>
> -1 for "ORDER BY" because sort on un-bounded data set does not have
> meanings. For un-bounded data set we already has the watermark to handle
> the out-of-orderness data, and for bounded data set, we can use the regular
> sort here because current table argument allows any query actually.
>
> Best,
> Danny Chan
> 在 2020年10月15日 +0800 PM5:16,dev@flink.apache.org,写道:
> >
> > Personally, I find this easier to explain to users than telling them the
> > difference why a session window has SET semantic input tables and
> > tumble/sliding have ROW semantic input tables.
>


[VOTE] FLIP-145: Support SQL windowing table-valued function (2nd)

2020-11-09 Thread Jark Wu
Hi all,

There is new feedback on the FLIP-145. So I would like to start a new vote
for FLIP-145 [1],
which has been discussed and reached consensus in the discussion thread [2].

The vote will be open until 15:00 (UTC+8) 13th Nov. (72h), unless there is
an objection or not enough votes.

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function
[2]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-145-Support-SQL-windowing-table-valued-function-td45269.html


Re: [VOTE] FLIP-145: Support SQL windowing table-valued function (2nd)

2020-11-10 Thread Jark Wu
+1 (binding)

On Tue, 10 Nov 2020 at 14:59, Jark Wu  wrote:

> Hi all,
>
> There is new feedback on the FLIP-145. So I would like to start a new vote
> for FLIP-145 [1],
> which has been discussed and reached consensus in the discussion thread
> [2].
>
> The vote will be open until 15:00 (UTC+8) 13th Nov. (72h), unless there is
> an objection or not enough votes.
>
> Best,
> Jark
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function
> [2]:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-145-Support-SQL-windowing-table-valued-function-td45269.html
>


Re: [DISCUSS] Introduce TableFactory for StatefulSequenceSource

2020-03-21 Thread Jark Wu
+1 to Bowen's proposal. I also saw many requirements on such built-in
connectors.

I will leave some my thoughts here:

> 1. datagen source (random source)
I think we can merge the functinality of sequence-source into random source
to allow users to custom their data values.
Flink can generate random data according to the field types, users
can customize their values to be more domain specific, e.g.
'field.user'='User_[1-9]{0,1}'
This will be similar to kafka-datagen-connect[1].

> 2. console sink (print sink)
This will be very useful in production debugging, to easily output an
intermediate view or result view to a `.out` file.
So that we can look into the data representation, or check dirty data.
This should be out-of-box without manually DDL registration.

> 3. blackhole sink (no output sink)
This is very useful for high performance testing of Flink, to meansure the
throughput of the whole pipeline without sink.
Presto also provides this as a built-in connector [2].

Best,
Jark

[1]:
https://github.com/confluentinc/kafka-connect-datagen#define-a-new-schema-specification
[2]: https://prestodb.io/docs/current/connector/blackhole.html


On Sat, 21 Mar 2020 at 12:31, Bowen Li  wrote:

> +1.
>
> I would suggest to take a step even further and see what users really need
> to test/try/play with table API and Flink SQL. Besides this one, here're
> some more sources and sinks that I have developed or used previously to
> facilitate building Flink table/SQL pipelines.
>
>
>1. random input data source
>   - should generate random data at a specified rate according to schema
>   - purposes
>  - test Flink pipeline and data can end up in external storage
>  correctly
>  - stress test Flink sink as well as tuning up external storage
>   2. print data sink
>   - should print data in row format in console
>   - purposes
>  - make it easier to test Flink SQL job e2e in IDE
>  - test Flink pipeline and ensure output data format/value is
>  correct
>   3. no output data sink
>   - just swallow output data without doing anything
>   - purpose
>  - evaluate and tune performance of Flink source and the whole
>  pipeline. Users' don't need to worry about sink back pressure
>
> These may be taken into consideration all together as an effort to lower
> the threshold of running Flink SQL/table API, and facilitate users' daily
> work.
>
> Cheers,
> Bowen
>
>
> On Thu, Mar 19, 2020 at 10:32 PM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > I heard some users complain that table is difficult to test. Now with SQL
> > client, users are more and more inclined to use it to test rather than
> > program.
> > The most common example is Kafka source. If users need to test their SQL
> > output and checkpoint, they need to:
> >
> > - 1.Launch a Kafka standalone, create a Kafka topic .
> > - 2.Write a program, mock input records, and produce records to Kafka
> > topic.
> > - 3.Then test in Flink.
> >
> > The step 1 and 2 are annoying, although this test is E2E.
> >
> > Then I found StatefulSequenceSource, it is very good because it has deal
> > with checkpoint things, so it is very good to checkpoint
> mechanism.Usually,
> > users are turned on checkpoint in production.
> >
> > With computed columns, user are easy to create a sequence source DDL same
> > to Kafka DDL. Then they can test inside Flink, don't need launch other
> > things.
> >
> > Have you consider this? What do you think?
> >
> > CC: @Aljoscha Krettek  the author
> > of StatefulSequenceSource.
> >
> > Best,
> > Jingsong Lee
> >
>


Re: [DISCUSS] Introduce TableFactory for StatefulSequenceSource

2020-03-23 Thread Jark Wu
Hi Jingsong,

Regarding (2) and (3), I was thinking to ignore manually DDL work, so users
can use them directly:

# this will log results to `.out` files
INSERT INTO console
SELECT ...

# this will drop all received records
INSERT INTO blackhole
SELECT ...

Here `console` and `blackhole` are system sinks which is similar to system
functions.

Best,
Jark

On Mon, 23 Mar 2020 at 16:33, Benchao Li  wrote:

> Hi Jingsong,
>
> Thanks for bring this up. Generally, it's a very good proposal.
>
> About data gen source, do you think we need to add more columns with
> various types?
>
> About print sink, do we need to specify the schema?
>
> Jingsong Li  于2020年3月23日周一 下午1:51写道:
>
> > Thanks Bowen, Jark and Dian for your feedback and suggestions.
> >
> > I reorganize with your suggestions, and try to expose DDLs:
> >
> > 1.datagen source:
> > - easy startup/test for streaming job
> > - performance testing
> >
> > DDL:
> > CREATE TABLE user (
> > id BIGINT,
> > age INT,
> > description STRING
> > ) WITH (
> > 'connector.type' = 'datagen',
> > 'connector.rows-per-second'='100',
> > 'connector.total-records'='100',
> >
> > 'schema.id.generator' = 'sequence',
> > 'schema.id.generator.start' = '1',
> >
> > 'schema.age.generator' = 'random',
> > 'schema.age.generator.min' = '0',
> > 'schema.age.generator.max' = '100',
> >
> > 'schema.description.generator' = 'random',
> > 'schema.description.generator.length' = '100'
> > )
> >
> > Default is random generator.
> > Hi Jark, I don't want to bring complicated regularities, because it can
> be
> > done through computed columns. And it is hard to define
> > standard regularities, I think we can leave it to the future.
> >
> > 2.print sink:
> > - easy test for streaming job
> > - be very useful in production debugging
> >
> > DDL:
> > CREATE TABLE print_table (
> > ...
> > ) WITH (
> > 'connector.type' = 'print'
> > )
> >
> > 3.blackhole sink
> > - very useful for high performance testing of Flink
> > - I've also run into users trying UDF to output, not sink, so they need
> > this sink as well.
> >
> > DDL:
> > CREATE TABLE blackhole_table (
> > ...
> > ) WITH (
> > 'connector.type' = 'blackhole'
> > )
> >
> > What do you think?
> >
> > Best,
> > Jingsong Lee
> >
> > On Mon, Mar 23, 2020 at 12:04 PM Dian Fu  wrote:
> >
> > > Thanks Jingsong for bringing up this discussion. +1 to this proposal. I
> > > think Bowen's proposal makes much sense to me.
> > >
> > > This is also a painful problem for PyFlink users. Currently there is no
> > > built-in easy-to-use table source/sink and it requires users to write a
> > lot
> > > of code to trying out PyFlink. This is especially painful for new users
> > who
> > > are not familiar with PyFlink/Flink. I have also encountered the
> tedious
> > > process Bowen encountered, e.g. writing random source connector, print
> > sink
> > > and also blackhole print sink as there are no built-in ones to use.
> > >
> > > Regards,
> > > Dian
> > >
> > > > 在 2020年3月22日,上午11:24,Jark Wu  写道:
> > > >
> > > > +1 to Bowen's proposal. I also saw many requirements on such built-in
> > > > connectors.
> > > >
> > > > I will leave some my thoughts here:
> > > >
> > > >> 1. datagen source (random source)
> > > > I think we can merge the functinality of sequence-source into random
> > > source
> > > > to allow users to custom their data values.
> > > > Flink can generate random data according to the field types, users
> > > > can customize their values to be more domain specific, e.g.
> > > > 'field.user'='User_[1-9]{0,1}'
> > > > This will be similar to kafka-datagen-connect[1].
> > > >
> > > >> 2. console sink (print sink)
> > > > This will be very useful in production debugging, to easily output an
> > > > intermediate view or result view to a `.out` file.
> > > > So that we can look into the data representation, or check d

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-03-23 Thread Jark Wu
Hi Jingsong, Dawid,

I created https://issues.apache.org/jira/browse/FLINK-16725 to track this
issue. We can continue discussion there.

Best,
Jark

On Thu, 27 Feb 2020 at 10:32, Jingsong Li  wrote:

> Hi Jark,
>
> The matrix I see is SQL cast. If we need bring another conversion matrix
> that is different from SQL cast, I don't understand the benefits. It makes
> me difficult to understand.
> And It seems bad to change the timestamp of different time zones to the
> same value silently.
>
> I have seen a lot of timestamp formats,  SQL, ISO, RFC. I can think that a
> "timestampFormat" could help them to deal with various formats.
> What way do you think can solve all the problems?
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 26, 2020 at 10:45 PM Jark Wu  wrote:
>
>> Hi Jingsong,
>>
>> I don't think it should follow SQL CAST semantics, because it is out of
>> SQL, it happens in connectors which converts users'/external's format into
>> SQL types.
>> I also doubt "timestampFormat" may not work in some cases, because the
>> timestamp format maybe various and mixed in a topic.
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 22:20, Jingsong Li  wrote:
>>
>>> Thanks all for your discussion.
>>>
>>> Hi Dawid,
>>>
>>> +1 to apply the logic of parsing a SQL timestamp literal.
>>>
>>> I don't fully understand the matrix your list. Should this be the
>>> semantics of SQL cast?
>>> Do you mean this is implicit cast in JSON parser?
>>> I doubt that because these implicit casts are not support
>>> in LogicalTypeCasts. And it is not so good to understand when it occur
>>> silently.
>>>
>>> How about add "timestampFormat" property to JSON parser? Its default
>>> value is SQL timestamp literal format. And user can configure this.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu  wrote:
>>>
>>>> Hi Dawid,
>>>>
>>>> I agree with you. If we want to loosen the format constraint, the
>>>> important piece is the conversion matrix.
>>>>
>>>> The conversion matrix you listed makes sense to me. From my
>>>> understanding,
>>>> there should be 6 combination.
>>>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE =>
>>>> WITH
>>>> TIMEZONE to make the matrix complete.
>>>> When the community reach an agreement on this, we should write it down
>>>> on
>>>> the documentation and follow the matrix in all text-based formats.
>>>>
>>>> Regarding to the RFC 3339 compatibility mode switch, it also sounds
>>>> good to
>>>> me.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz 
>>>> wrote:
>>>>
>>>> > Hi all,
>>>> >
>>>> > @NiYanchun Thank you for reporting this. Yes I think we could improve
>>>> the
>>>> > behaviour of the JSON format.
>>>> >
>>>> > @Jark First of all I do agree we could/should improve the
>>>> > "user-friendliness" of the JSON format (and unify the behavior across
>>>> text
>>>> > based formats). I am not sure though if it is as simple as just
>>>> ignore the
>>>> > time zone here.
>>>> >
>>>> > My suggestion would be rather to apply the logic of parsing a SQL
>>>> > timestamp literal (if the expected type is of
>>>> LogicalTypeFamily.TIMESTAMP),
>>>> > which would actually also derive the "stored" type of the timestamp
>>>> (either
>>>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
>>>> conversion.
>>>> > Therefore if the
>>>> >
>>>> > parsed type |requested type|
>>>> behaviour
>>>> >
>>>> > WITHOUT TIMEZONE| WITH TIMEZONE | store the local
>>>> > timezone with the data
>>>> >
>>>> > WITHOUT TIMEZONE| WITH LOCAL TIMEZONE  | do nothing in the
>>>> data,
>>>> > interpret the time in local timezone
>>>> >
>>>> > WITH TIMEZONE  | WITH LOCAL TIMEZONE   | convert the
>>>> timestamp
>>&g

Re: [DISCUSS] FLIP-110: Support LIKE clause in CREATE TABLE

2020-03-24 Thread Jark Wu
+1 to use LIKE and put after schema part.
I also prefer the keyword LIKE than INHERITS, because it's easier to type
and understand, for a non-native English user :)
But I would like to limit a single LIKE clause in the DDL in the first
version. We can allow multiple LIKE clause in the future if needed.

Best,
Jark

On Tue, 24 Mar 2020 at 19:03, Dawid Wysakowicz 
wrote:

> Sorry for a late reply, but I was on vacation.
>
> As for putting the LIKE after the schema part. You're right, sql
> standard lets it be only in the schema part. I was mislead by examples
> for DB2 and MYSQL, which differ from the standard in that respect. My
> bad, sorry.
>
> Nevertheless I'd still be in favour of using the LIKE clause for that
> purpose rather than INHERITS. I'm fine with putting it after the schema
> part. The argument that it applies to the options part make sense to me.
>
> I must admit I am not a fan of the INHERITS clause. @Jar I'd not
> redefine the semantics of the INHERITS clause entirely. I am sure it
> will pose unnecessary confusion if it differs significantly from what
> was implemented for, let's be true, more popular vendors such as
> PostgreSQL. My biggest concern is that the INHERITS clause in PostgreSQL
> allows constructs such as SELECT * FROM ONLY B (where e.g. A INHERITS
> B). My understanding of the purpose of the INHERITS clause is that it
> really emulates inheritance that let's you create "nested" data sets. I
> think what we are more interested in is a way to adjust only the
> metadata of an already existing table.
>
> Moreover I prefer the LIKE clause as it is more widespread. In some way
> it is supported by PostgreSQL, DB2, SnowflakeDB, MySQL.
>
> Lastly @Jingsong, I am not sure about the "link" part. I know at first
> glance having a link and reflecting changes might seem appealing, but I
> am afraid it would pose more threads than it would give benefits. First
> of all it would make the LIKE/INHERITS clause unusable for creating e.g.
> hive tables or jdbc tables that could be used from other systems, as the
> link would not be understandable by those systems.
>
> Best,
>
> Dawid
>
>
>
> On 05/03/2020 07:46, Jark Wu wrote:
> > Hi Dawid,
> >
> >> INHERITS creates a new table with a "link" to the original table.
> > Yes, INHERITS is a "link" to the original table in PostgreSQL.
> > But INHERITS is not SQL standard, I think it's fine for vendors to define
> > theire semantics.
> >
> >> Standard also allows declaring the clause after the schema part. We can
> > also do it.
> > Is that true? I didn't find it in SQL standard. If this is true, I prefer
> > to put LIKE after the schema part.
> >
> > 
> >
> > Hi Jingsong,
> >
> > The concern you mentioned in (2) is exactly my concern too. That's why I
> > suggested INHERITS, or put LIKE after schema part.
> >
> > Best,
> > Jark
> >
> > On Thu, 5 Mar 2020 at 12:05, Jingsong Li  wrote:
> >
> >> Thanks Dawid for starting this discussion.
> >>
> >> I like the "LIKE".
> >>
> >> 1.For "INHERITS", I think this is a good feature too, yes, ALTER TABLE
> will
> >> propagate any changes in column data definitions and check constraints
> down
> >> the inheritance hierarchy. A inherits B, A and B share every things,
> they
> >> have the same kafka topic. If modify schema of B, this means underlying
> >> kafka topic schema changed, so I think it is good to modify A too. If
> this
> >> for "ConfluentSchemaRegistryCatalog" mention by Jark, I think sometimes
> >> this is just we want.
> >> But "LIKE" also very useful for many cases.
> >>
> >> 2.For LIKE statement in schema, I know two kinds of like syntax, one is
> >> MySQL/hive/sqlserver, the other is PostgreSQL. I prefer former:
> >> - In the FLIP, there is "OVERWRITING OPTIONS", this will overwrite
> >> properties in "with"? This looks weird, because "LIKE" is in schema,
> but it
> >> can affect outside properties.
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Wed, Mar 4, 2020 at 2:05 PM Dawid Wysakowicz  >
> >> wrote:
> >>
> >>> Hi Jark,
> >>> I did investigate the INHERITS clause, but it has a semantic that in my
> >>> opinion we definitely don't want to support. INHERITS creates a new
> table
> >>> with a "link" to the original table. There

Re: [DISCUSS] FLIP-95: New TableSource and TableSink interfaces

2020-03-24 Thread Jark Wu
Thanks Timo for updating the formats section. That would be very helpful
for changelog supporting (FLIP-105).

I just left 2 minor comment about some method names. In general, I'm +1 to
start a voting.

--

Hi Becket,

I agree we shouldn't duplicate codes, especiall the runtime
implementations.
However, the interfaces proposed by FLIP-95 are mainly used during
optimization (compiling), not runtime.
I don't think there is much to share for this. Because table/sql
is declarative, but DataStream is imperative.
For example, filter push down, DataStream FilterableSource may allow to
accept a FilterFunction (which is a black box for the source).
However, table sources should pick the pushed filter expressions, some
sources may only support "=", "<", ">" conditions.
Pushing a FilterFunction doesn't work in table ecosystem. That means, the
connectors have to have some table-specific implementations.


Best,
Jark

On Tue, 24 Mar 2020 at 20:41, Kurt Young  wrote:

> Hi Becket,
>
> I don't think DataStream should see some SQL specific concepts such as
> Filtering or ComputedColumn. It's
> better to stay within SQL area and translate to more generic concept when
> translating to DataStream/Runtime
> layer, such as use MapFunction to represent computed column logic.
>
> Best,
> Kurt
>
>
> On Tue, Mar 24, 2020 at 5:47 PM Becket Qin  wrote:
>
> > Hi Timo and Dawid,
> >
> > It's really great that we have the same goal. I am actually wondering if
> we
> > can go one step further to avoid some of the interfaces in Table as well.
> >
> > For example, if we have the FilterableSource, do we still need the
> > FilterableTableSource? Should DynamicTableSource just become a
> > Source<*Row*,
> > SourceSplitT, EnumChkT>?
> >
> > Can you help me understand a bit more about the reason we need the
> > following relational representation / wrapper interfaces v.s. the
> > interfaces that we could put to the Source in FLIP-27?
> >
> > DynamicTableSource v.s. Source
> > SupportsFilterablePushDown v.s. FilterableSource
> > SupportsProjectablePushDown v.s. ProjectableSource
> > SupportsWatermarkPushDown v.s. WithWatermarkAssigner
> > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer
> > ScanTableSource v.s. ChangeLogDeserializer.
> > LookUpTableSource v.s. LookUpSource
> >
> > Assuming we have all the interfaces on the right side, do we still need
> the
> > interfaces on the left side? Note that the interfaces on the right can be
> > used by both DataStream and Table. If we do this, there will only be one
> > set of Source interfaces Table and DataStream, the only difference is
> that
> > the Source for table will have some specific plugins and configurations.
> An
> > omnipotent Source can implement all the the above interfaces and take a
> > Deserializer that implements both ComputedColumnDeserializer and
> > ChangeLogDeserializer.
> >
> > Would the SQL planner work with that?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> >
> > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li 
> > wrote:
> >
> > > +1. Thanks Timo for the design doc.
> > >
> > > We can also consider @Experimental too. But I am +1 to @PublicEvolving,
> > we
> > > should be confident in the current change.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther 
> wrote:
> > >
> > > > @Becket: We totally agree that we don't need table specific
> connectors
> > > > during runtime. As Dawid said, the interfaces proposed here are just
> > for
> > > > communication with the planner. Once the properties (watermarks,
> > > > computed column, filters, projecttion etc.) are negotiated, we can
> > > > configure a regular Flink connector.
> > > >
> > > > E.g. setting the watermark assigner and deserialization schema of a
> > > > Kafka connector.
> > > >
> > > > For better separation of concerns, Flink connectors should not
> include
> > > > relational interfaces and depend on flink-table. This is the
> > > > responsibility of table source/sink.
> > > >
> > > > @Kurt: I would like to mark them @PublicEvolving already because we
> > need
> > > > to deprecate the old interfaces as early as possible. We cannot
> > redirect
> > > > to @Internal interfaces. They are not marked @Public, so we can still
> > > > evolve them. But a core design shift should not happen again, it
> would
> > > > leave a bad impression if we are redesign over and over again.
> Instead
> > > > we should be confident in the current change.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 24.03.20 09:20, Dawid Wysakowicz wrote:
> > > > > Hi Becket,
> > > > >
> > > > > Answering your question, we have the same intention not to
> duplicate
> > > > > connectors between datastream and table apis. The interfaces
> proposed
> > > in
> > > > > the FLIP are a way to describe relational properties of a source.
> The
> > > > > intention is as you des

Re: [DISCUSS] FLIP-95: New TableSource and TableSink interfaces

2020-03-24 Thread Jark Wu
shDown {
> >
> >   applyFilters(List filters) {
> >
> > this.filters = convertToDataStreamFilters(filters);
> >
> >   }
> >
> >   Source createSource() {
> >
> > return Source.create()
> >
> >   .applyFilters(this.filters);
> >
> >}
> >
> > }
> >
> > or exactly as you said for the computed columns:
> >
> >
> > SupportsComputedColumnsPushDown {
> >
> >
> >
> >   applyComputedColumn(ComputedColumnConverter converter) {
> >
> > this.deserializationSchema = new DeserializationSchema {
> >
> >   Row deserialize(...) {
> >
> > RowData row = format.deserialize(bytes); // original format, e.g
> > json, avro, etc.
> >
> > RowData enriched = converter(row)
> >
> >   }
> >
> > }
> >
> >   }
> >
> >   Source createSource() {
> >
> > return Source.create()
> >
> >   .withDeserialization(deserializationSchema);
> >
> >}
> >
> > }
> >
> > So to sum it up again, all those interfaces are factories that configure
> > appropriate parts of the DataStream API using Table API concepts. Finally
> > to answer you question for particular comparisons:
> >
> > DynamicTableSource v.s. Source
> > SupportsFilterablePushDown v.s. FilterableSource
> > SupportsProjectablePushDown v.s. ProjectableSource
> > SupportsWatermarkPushDown v.s. WithWatermarkAssigner
> > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer
> > ScanTableSource v.s. ChangeLogDeserializer.
> >
> > pretty much you can think of all on the left as factories for the right
> > side, left side works with Table API classes (Expressions, DataTypes). I
> > hope this clarifies it a bit.
> >
> > Best,
> >
> > Dawid
> > On 24/03/2020 15:03, Becket Qin wrote:
> >
> > Hey Kurt,
> >
> > I don't think DataStream should see some SQL specific concepts such as
> >
> > Filtering or ComputedColumn.
> >
> > Projectable and Filterable seems not necessarily SQL concepts, but could
> be
> > applicable to DataStream source as well to reduce the network load. For
> > example ORC and Parquet should probably also be readable from DataStream,
> > right?
> >
> > ComputedColumn is not part of the Source, it is an interface extends the
> > Deserializer, which is a pluggable for the Source. From the SQL's
> > perspective it has the concept of computed column, but from the Source
> > perspective, It is essentially a Deserializer which also converts the
> > records internally, assuming we allow some conversion to be embedded to
> > the source in addition to just deserialization.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu  <
> imj...@gmail.com> wrote:
> >
> >
> > Thanks Timo for updating the formats section. That would be very helpful
> > for changelog supporting (FLIP-105).
> >
> > I just left 2 minor comment about some method names. In general, I'm +1
> to
> > start a voting.
> >
> >
> >
> --
> >
> > Hi Becket,
> >
> > I agree we shouldn't duplicate codes, especiall the runtime
> > implementations.
> > However, the interfaces proposed by FLIP-95 are mainly used during
> > optimization (compiling), not runtime.
> > I don't think there is much to share for this. Because table/sql
> > is declarative, but DataStream is imperative.
> > For example, filter push down, DataStream FilterableSource may allow to
> > accept a FilterFunction (which is a black box for the source).
> > However, table sources should pick the pushed filter expressions, some
> > sources may only support "=", "<", ">" conditions.
> > Pushing a FilterFunction doesn't work in table ecosystem. That means, the
> > connectors have to have some table-specific implementations.
> >
> >
> > Best,
> > Jark
> >
> > On Tue, 24 Mar 2020 at 20:41, Kurt Young  <
> ykt...@gmail.com> wrote:
> >
> >
> > Hi Becket,
> >
> > I don't think DataStream should see some SQL specific concepts such as
> > Filtering or ComputedColumn. It's
> > better to stay within SQL area and translate to more generic concept when
> > translating to DataStream/Runtime
> > layer, such

Re: [DISCUSS] FLIP-84 Feedback Summary

2020-03-25 Thread Jark Wu
Hi Godfrey,

The changes sounds good to me. +1 to start another voting.

A minor question: does the ResultKind contain an ERROR kind?

Best,
Jark


On Wed, 25 Mar 2020 at 18:51, Timo Walther  wrote:

> Hi Godfrey,
>
> thanks for starting the discussion on the mailing list. And sorry again
> for the late reply to FLIP-84. I have updated the Google doc one more
> time to incorporate the offline discussions.
>
>  From Dawid's and my view, it is fine to postpone the multiline support
> to a separate method. This can be future work even though we will need
> it rather soon.
>
> If there are no objections, I suggest to update the FLIP-84 again and
> have another voting process.
>
> Thanks,
> Timo
>
>
> On 25.03.20 11:17, godfrey he wrote:
> > Hi community,
> > Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> feedbacks
> > are all about new introduced methods. We had a discussion yesterday, and
> > most of feedbacks have been agreed upon. Here is the conclusions:
> >
> > *1. about proposed methods in `TableEnvironment`:*
> >
> > the original proposed methods:
> >
> > TableEnvironment.createDmlBatch(): DmlBatch
> > TableEnvironment.executeStatement(String statement): ResultTable
> >
> > the new proposed methods:
> >
> > // we should not use abbreviations in the API, and the term "Batch" is
> > easily confused with batch/streaming processing
> > TableEnvironment.createStatementSet(): StatementSet
> >
> > // every method that takes SQL should have `Sql` in its name
> > // supports multiline statement ???
> > TableEnvironment.executeSql(String statement): TableResult
> >
> > // new methods. supports explaining DQL and DML
> > TableEnvironment.explainSql(String statement, ExplainDetail... details):
> > String
> >
> >
> > *2. about proposed related classes:*
> >
> > the original proposed classes:
> >
> > interface DmlBatch {
> >  void addInsert(String insert);
> >  void addInsert(String targetPath, Table table);
> >  ResultTable execute() throws Exception ;
> >  String explain(boolean extended);
> > }
> >
> > public interface ResultTable {
> >  TableSchema getResultSchema();
> >  Iterable getResultRows();
> > }
> >
> > the new proposed classes:
> >
> > interface StatementSet {
> >  // every method that takes SQL should have `Sql` in its name
> >  // return StatementSet instance for fluent programming
> >  addInsertSql(String statement): StatementSet
> >
> >  // return StatementSet instance for fluent programming
> >  addInsert(String tablePath, Table table): StatementSet
> >
> >  // new method. support overwrite mode
> >  addInsert(String tablePath, Table table, boolean overwrite):
> > StatementSet
> >
> >  explain(): String
> >
> >  // new method. supports adding more details for the result
> >  explain(ExplainDetail... extraDetails): String
> >
> >  // throw exception ???
> >  execute(): TableResult
> > }
> >
> > interface TableResult {
> >  getTableSchema(): TableSchema
> >
> >  // avoid custom parsing of an "OK" row in programming
> >  getResultKind(): ResultKind
> >
> >  // instead of `get` make it explicit that this is might be
> triggering
> > an expensive operation
> >  collect(): Iterable
> >
> >  // for fluent programming
> >  print(): Unit
> > }
> >
> > enum ResultKind {
> >  SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >  SUCCESS_WITH_CONTENT, // rows with important content are available
> > (DML, DQL)
> > }
> >
> >
> > *3. new proposed methods in `Table`*
> >
> > `Table.insertInto()` will be deprecated, and the following methods are
> > introduced:
> >
> > Table.executeInsert(String tablePath): TableResult
> > Table.executeInsert(String tablePath, boolean overwrite): TableResult
> > Table.explain(ExplainDetail... details): String
> > Table.execute(): TableResult
> >
> > There are two issues need further discussion, one is whether
> > `TableEnvironment.executeSql(String statement): TableResult` needs to
> > support multiline statement (or whether `TableEnvironment` needs to
> support
> > multiline statement), and another one is whether `StatementSet.execute()`
> > needs to throw exception.
> >
> > please refer to the feedback document [2] for the details.
> >
> > Any suggestions are warmly welcomed!
> >
> > [1]
> >
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> > [2]
> >
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >
> > Best,
> > Godfrey
> >
>
>


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-03-26 Thread Jark Wu
Hi Danny,

Regarding to `supportedHintOptions()` interface, I suggest to use the
inverted version, `unsupportedHintOptions()`.
Because I think the disallowed list is much smaller.
In addition, it's hard to list all the properties under
`connector.properties.*`.
But we know `connector.properties.bootstrap.servers` and
`connector.properties.zookeeper.connect` are the only security options.

Best,
Jark

On Thu, 26 Mar 2020 at 16:47, Kurt Young  wrote:

> Hi Danny,
>
> Thanks for the updates. I have 2 comments regarding to latest document:
>
> 1) I think we also need `*supportedHintOptions*` for
> `*TableFormatFactory*`
> 2) IMO "dynamic-table-options.enabled" should belong to `
> *OptimizerConfigOptions*`
>
> Best,
> Kurt
>
>
> On Thu, Mar 26, 2020 at 4:40 PM Timo Walther  wrote:
>
> > Thanks for the update Danny. +1 for this proposal.
> >
> > Regards,
> > Timo
> >
> > On 26.03.20 04:51, Danny Chan wrote:
> > > Thanks everyone who engaged in this discussion ~
> > >
> > > Our goal is "Supports Dynamic Table Options for Flink SQL". After an
> > > offline discussion with Kurt, Timo and Dawid, we have made the final
> > > conclusion, here is the summary:
> > >
> > >
> > > - Use comment style syntax to specify the dynamic table options:
> "/*+
> > > *OPTIONS*(k1='v1', k2='v2') */"
> > > - Have constraint on the options keys: the options that may bring
> in
> > > security problems should not be allowed, i.e. Kafka connector
> > zookeeper
> > > endpoint URL and topic name
> > > - Use white-list to control the allowed options for each connector,
> > > which is more safe for future extention
> > > - We allow to enable/disable this feature globally
> > > - Implement based on the current code base first, and when FLIP-95
> is
> > > checked in, implement this feature based on new interface
> > >
> > > Any suggestions are appreciated ~
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
> > >
> > > Best,
> > > Danny Chan
> > >
> > > Jark Wu  于2020年3月18日周三 下午10:38写道:
> > >
> > >> Hi everyone,
> > >>
> > >> Sorry, but I'm not sure about the `supportedHintOptions`. I'm afraid
> it
> > >> doesn't solve the problems but increases some development and learning
> > >> burdens.
> > >>
> > >> # increase development and learning burden
> > >>
> > >> According to the discussion so far, we want to support overriding a
> > subset
> > >> of options in hints which doesn't affect semantics.
> > >> With the `supportedHintOptions`, it's up to the connector developers
> to
> > >> decide which options will not affect semantics, and to be hint
> options.
> > >> However, the question is how to distinguish whether an option will
> > *affect
> > >> semantics*? What happens if an option will affect semantics but
> > provided as
> > >> hint options?
> > >>  From my point of view, it's not easy to distinguish. For example, the
> > >> "format.ignore-parse-error" can be a very useful dynamic option but
> that
> > >> will affect semantic, because the result is different (null vs
> > exception).
> > >> Another example, the "connector.lookup.cache.*" options are also very
> > >> useful to tune jobs, however, it will also affect the job results. I
> can
> > >> come up many more useful options but may affect semantics.
> > >>
> > >> I can see that the community will under endless discussion around "can
> > this
> > >> option to be a hint option?",  "wether this option will affect
> > semantics?".
> > >> You can also find that we already have different opinions on
> > >> "ignore-parse-error". Those discussion is a waste of time! That's not
> > what
> > >> users want!
> > >> The problem is user need this, this, this options and HOW to expose
> > them?
> > >> We should focus on that.
> > >>
> > >> Then there could be two endings in the future:
> > >> 1) compromise on the usability, we drop the rule that hints don't
> affect
> > >> semantics, allow all the useful options in the hints list.

Re: [VOTE] FLIP-115: Filesystem connector in Table

2020-03-29 Thread Jark Wu
+1 (binding)

Best,
Jark

On Mon, 30 Mar 2020 at 11:32, Jingsong Li  wrote:

> +1 (binding)
>
> Best,
> Jingsong Lee
>
> On Sun, Mar 29, 2020 at 10:37 PM Benchao Li  wrote:
>
> > +1 (non-binding)
> >
> > This feature will greatly enrich Flink SQL's ecology~
> >
> > Leonard Xu  于2020年3月29日周日 下午9:44写道:
> >
> > > +1
> > >
> > > Best,
> > > Leonard
> > >
> > > > 在 2020年3月28日,16:25,Kurt Young  写道:
> > > >
> > > > +1
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Fri, Mar 27, 2020 at 10:51 AM Jingsong Li  >
> > > wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> I'd like to start the vote of FLIP-115 [1], which introduce
> Filesystem
> > > >> table factory in table. This FLIP is discussed in the thread[2].
> > > >>
> > > >> The vote will be open for at least 72 hours. Unless there is an
> > > objection,
> > > >> I will try to close it by March 30, 2020 03:00 UTC if we have
> received
> > > >> sufficient votes.
> > > >>
> > > >> [1]
> > > >>
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-115-Filesystem-connector-in-Table-td38870.html
> > > >>
> > > >> Best
> > > >> Jingsong Lee
> > > >>
> > >
> > >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>
>
> --
> Best, Jingsong Lee
>


[DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-29 Thread Jark Wu
Hi everyone,

I want to start a discussion about further improve and simplify our current
connector porperty keys, aka WITH options. Currently, we have a
'connector.' prefix for many properties, but they are verbose, and we see a
big inconsistency between the properties when designing FLIP-107.

So we propose to remove all the 'connector.' prefix and rename
'connector.type' to 'connector', 'format.type' to 'format'. So a new Kafka
DDL may look like this:

CREATE TABLE kafka_table (
 ...
) WITH (
 'connector' = 'kafka',
 'version' = '0.10',
 'topic' = 'test-topic',
 'startup-mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'format.fail-on-missing-field' = 'false'
);

The new connector property key set will come together with new Factory
inferface which is proposed in FLIP-95. Old properties are still compatible
with their existing implementation. New properties are only available in
new DynamicTableFactory implementations.

You can access the detailed FLIP here:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

Best,
Jark


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi Kurt,

That's good questions.

> the meaning of "version"
There are two versions in the old design. One is property version
"connector.property-version" which can be used for backward compatibility.
The other one is "connector.version" which defines the version of external
system, e.g. 0.11" for kafka, "6" or "7" for ES.
In this proposal, the "version" is the previous "connector.version". The
""connector.property-version" is not introduced in new design.

> how to keep the old capability which can evolve connector properties
The "connector.property-version" is an optional key in the old design and
is never bumped up.
I'm not sure how "connector.property-version" should work in the initial
design. Maybe @Timo Walther  has more knowledge on
this.
But for the new properties, every options should be expressed as
`ConfigOption` which provides `withDeprecatedKeys(...)` method to easily
support evolving keys.

> a single keys instead of two, e.g. "kafka-0.11", "kafka-universal"?
There are several benefit to use separate "version" key I can see:
1) it's more readable to separete them into different keys, because they
are orthogonal concepts.
2) the planner can give all the availble versions in the exception message,
if user uses a wrong version (this is often reported in user ML).
3) If we use "kafka-0.11" as connector identifier, we may have to write a
full documentation for each version, because they are different "connector"?
IMO, for 0.11, 0.11, etc... kafka, they are actually the same connector
but with different "client jar" version,
they share all the same supported property keys and should reside
together.
4) IMO, the future vision is version-free. At some point in the future, we
may don't need users to specify kafka version anymore, and make
"version=universal" as optional or removed in the future. This is can be
done easily if they are separate keys.

Best,
Jark


On Mon, 30 Mar 2020 at 14:45, Kurt Young  wrote:

> Hi Jark,
>
> Thanks for the proposal, I'm +1 to the general idea. However I have a
> question about "version",
> in the old design, the version seems to be aimed for tracking property
> version, with different
> version, we could evolve these step by step without breaking backward
> compatibility. But in this
> design, version is representing external system's version, like "0.11" for
> kafka, "6" or "7" for ES.
> I'm not sure if this is necessary, what's the benefit of using two keys
> instead of one, like "kafka-0.11"
> or "ES6" directly? And how about the old capability which could let us
> evolving connector properties?
>
> Best,
> Kurt
>
>
> On Mon, Mar 30, 2020 at 2:36 PM LakeShen 
> wrote:
>
> > Hi Jark,
> >
> > I am really looking forward to this feature. I think this feature
> > could simplify flink sql code,and at the same time ,
> > it could make the developer more easlier to config the flink sql WITH
> > options.
> >
> > Now when I am using flink sql to write flink task , sometimes I think the
> > WITH options is too long for user.
> > For example,I config the kafka source connector parameter,for consumer
> > group and brokers parameter:
> >
> >   'connector.properties.0.key' = 'group.id'
> > >  , 'connector.properties.0.value' = 'xxx'
> > >  , 'connector.properties.1.key' = 'bootstrap.servers'
> > >  , 'connector.properties.1.value' = 'x'
> > >
> >
> > I can understand this config , but for the flink fresh man,maybe it
> > is confused for him.
> > In my thought, I am really looking forward to this feature,thank you to
> > propose this feature.
> >
> > Best wishes,
> > LakeShen
> >
> >
> > Jark Wu  于2020年3月30日周一 下午2:02写道:
> >
> > > Hi everyone,
> > >
> > > I want to start a discussion about further improve and simplify our
> > current
> > > connector porperty keys, aka WITH options. Currently, we have a
> > > 'connector.' prefix for many properties, but they are verbose, and we
> > see a
> > > big inconsistency between the properties when designing FLIP-107.
> > >
> > > So we propose to remove all the 'connector.' prefix and rename
> > > 'connector.type' to 'connector', 'format.type' to 'format'. So a new
> > Kafka
> > > DDL may look like this:
> > >
&

Re: [VOTE] FLIP-95: New TableSource and TableSink interfaces

2020-03-30 Thread Jark Wu
+1 from my side.

Thanks Timo for driving this.

Best,
Jark

On Mon, 30 Mar 2020 at 15:36, Timo Walther  wrote:

> Hi all,
>
> I would like to start the vote for FLIP-95 [1], which is discussed and
> reached a consensus in the discussion thread [2].
>
> The vote will be open until April 2nd (72h), unless there is an
> objection or not enough votes.
>
> Thanks,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> [2]
>
> https://lists.apache.org/thread.html/r03cbce8996fd06c9b0406c9ddc0d271bd456f943f313b9261fa061f9%40%3Cdev.flink.apache.org%3E
>


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi Kurt,

> 2) Lists all available connectors seems also quite straightforward, e.g
user provided a wrong "kafka-0.8", we tell user we have candidates of
"kakfa-0.11", "kafka-universal"
It's not possible for the framework to throw such exception. Because the
framework doesn't know what versions do the connector support. All the
version information is a blackbox in the identifier. But with
`Factory#factoryVersion()` interface, we can know all the supported
versions.

> 3) I don't think so. We can still treat it as the same connector but with
different versions.
That's true but that's weird. Because from the plain DDL definition, they
look like different connectors with different "connector" value, e.g.
'connector=kafka-0.8', 'connector=kafka-0.10'.

> If users don't set any version, we will use "kafka-universal" instead.
The behavior is inconsistent IMO.
That is a long term vision when there is no kafka clusters with <0.11
version.
At that point, "universal" is the only supported version in Flink and the
"version" key can be optional.

-

Hi Jingsong,

> "version" vs "kafka.version"
I though about it. But if we prefix "kafka" to version, we should prefix
"kafka" for all other property keys, because they are all kafka specific
options.
However, that will make the property set verbose, see rejected option#2 in
the FLIP.

> explicitly separate options for source and sink
That's a good topic. It's good to have a guideline for the new property
keys.
I'm fine to prefix with a "source"/"sink" for some connector keys.
Actually, we already do this in some connectors, e.g. jdbc and hbase.

Best,
Jark

On Mon, 30 Mar 2020 at 16:36, Jingsong Li  wrote:

> Thanks Jark for the proposal.
>
> +1 to the general idea.
>
> For "version", what about "kafka.version"? It is obvious to know its
> meaning.
>
> And I'd like to start a new topic:
> Should we need to explicitly separate source from sink?
> With the development of batch and streaming, more and more connectors have
> both source and sink.
>
> So should we set a rule for table properties:
> - properties for both source and sink: without prefix, like "topic"
> - properties for source only: with "source." prefix, like
> "source.startup-mode"
> - properties for sink only: with "sink." prefix, like "sink.partitioner"
>
> What do you think?
>
> Best,
> Jingsong Lee
>
> On Mon, Mar 30, 2020 at 3:56 PM Jark Wu  wrote:
>
> > Hi Kurt,
> >
> > That's good questions.
> >
> > > the meaning of "version"
> > There are two versions in the old design. One is property version
> > "connector.property-version" which can be used for backward
> compatibility.
> > The other one is "connector.version" which defines the version of
> external
> > system, e.g. 0.11" for kafka, "6" or "7" for ES.
> > In this proposal, the "version" is the previous "connector.version". The
> > ""connector.property-version" is not introduced in new design.
> >
> > > how to keep the old capability which can evolve connector properties
> > The "connector.property-version" is an optional key in the old design and
> > is never bumped up.
> > I'm not sure how "connector.property-version" should work in the initial
> > design. Maybe @Timo Walther  has more knowledge on
> > this.
> > But for the new properties, every options should be expressed as
> > `ConfigOption` which provides `withDeprecatedKeys(...)` method to easily
> > support evolving keys.
> >
> > > a single keys instead of two, e.g. "kafka-0.11", "kafka-universal"?
> > There are several benefit to use separate "version" key I can see:
> > 1) it's more readable to separete them into different keys, because they
> > are orthogonal concepts.
> > 2) the planner can give all the availble versions in the exception
> message,
> > if user uses a wrong version (this is often reported in user ML).
> > 3) If we use "kafka-0.11" as connector identifier, we may have to write a
> > full documentation for each version, because they are different
> > "connector"?
> > IMO, for 0.11, 0.11, etc... kafka, they are actually the same
> connector
> > but with different "client jar" version,
> > they share all the same supported property keys and should reside
> > to

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
x27; side, using one property to match a connector will
> >> be easier. Especially we have many connectors,
> >> and some of the need version property required, and some of them not.
> >>
> >> Regarding Jingsong's suggestion,
> >> IMO, it's a very good complement to the FLIP. Distinguishing
> >> properties for source and sink can be very useful, and
> >> also this will make the connector property more precise.
> >> We are also sick of this for now, we cannot know whether a DDL is a
> >> source or sink unless we look through all queries where
> >> the table is used.
> >> Even more, some of the required properties are only required for
> >> source, bug we cannot leave it blank for sink, and vice versa.
> >> I think we can also add a type for dimension tables except source and
> >> sink.
> >>
> >> Kurt Young mailto:ykt...@gmail.com>> 于2020年3月30日
> >> 周一 下午8:16写道:
> >>
> >>  > It's not possible for the framework to throw such exception.
> >> Because the
> >> framework doesn't know what versions do the connector support.
> >>
> >> Not really, we can list all valid connectors framework could
> >> found. E.g.
> >> user mistyped 'kafka-0.x', the error message will looks like:
> >>
> >> we don't have any connector named "kafka-0.x", but we have:
> >> FileSystem
> >> Kafka-0.10
> >> Kafka-0.11
> >> ElasticSearch6
> >> ElasticSearch7
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Mon, Mar 30, 2020 at 5:11 PM Jark Wu  >> <mailto:imj...@gmail.com>> wrote:
> >>
> >>  > Hi Kurt,
> >>  >
> >>  > > 2) Lists all available connectors seems also quite
> >> straightforward, e.g
> >>  > user provided a wrong "kafka-0.8", we tell user we have
> >> candidates of
> >>  > "kakfa-0.11", "kafka-universal"
> >>  > It's not possible for the framework to throw such exception.
> >> Because the
> >>  > framework doesn't know what versions do the connector support.
> >> All the
> >>  > version information is a blackbox in the identifier. But with
> >>  > `Factory#factoryVersion()` interface, we can know all the
> >> supported
> >>  > versions.
> >>  >
> >>  > > 3) I don't think so. We can still treat it as the same
> >> connector but with
> >>  > different versions.
> >>  > That's true but that's weird. Because from the plain DDL
> >> definition, they
> >>  > look like different connectors with different "connector"
> >> value, e.g.
> >>  > 'connector=kafka-0.8', 'connector=kafka-0.10'.
> >>  >
> >>  > > If users don't set any version, we will use "kafka-universal"
> >> instead.
> >>  > The behavior is inconsistent IMO.
> >>  > That is a long term vision when there is no kafka clusters
> >> with <0.11
> >>  > version.
> >>  > At that point, "universal" is the only supported version in Flink
> >> and the
> >>  > "version" key can be optional.
> >>  >
> >>  > -
> >>  >
> >>  > Hi Jingsong,
> >>  >
> >>  > > "version" vs "kafka.version"
> >>  > I though about it. But if we prefix "kafka" to version, we should
> >> prefix
> >>  > "kafka" for all other property keys, because they are all kafka
> >> specific
> >>  > options.
> >>  > However, that will make the property set verbose, see rejected
> >> option#2 in
> >>  > the FLIP.
> >>  >
> >>  > > explicitly separate options for source and sink
> >>  > That's a good topic. It's good to have a guideline for the new
> >> property
> >>  > keys.
> >>  > I'm fine to prefix with a "source"/"sink" for some connector
> >> keys.
> >>  > Actually, we already do this in some conn

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi all,

Thanks for the feedbacks.

It seems that we have a conclusion to put the version into the factory
identifier. I'm also fine with this.
If we have this outcome, the interface of Factory#factoryVersion is not
needed anymore, this can simplify the learning cost of new factory.
We may need to update FLIP-95 and re-vote for it? cc @Timo Walther


kafka  => kafka for 0.11+ versions, we don't suffix "-universal", because
the meaning of "universal" not easy to understand.
kafka-0.11 => kafka for 0.11 version
kafka-0.10 => kafka for 0.10 version
elasticsearch-6 => elasticsearch for 6.x versions
elasticsearch-7 => elasticsearch for 7.x versions
hbase-1.4 => hbase for 1.4.x versions
jdbc
filesystem

We use "-" as the version delimiter to make them to be more consistent.
This is not forces, users can also use other delimiters or without
delimiter.
But this can be a guilde in the Javadoc of Factory, to make the connector
ecosystem to be more consistent.

What do you think?



Regarding "connector.property-version":

Hi @Dawid Wysakowicz  , the new fatories are
designed not support to read current properties.
All the current properties are routed to the old factories if they are
using "connector.type". Otherwise, properties are routed to new factories.

If I understand correctly, the "connector.property-version" is attched
implicitly by system, not manually set by users.
For example, the framework should add "connector.property-version=1" to
properties when processing DDL statement.
I'm fine to add a "connector.property-version=1" when processing DDL
statement, but I think it's also fine if we don't,
because this can be done in the future if need and the default version can
be 1.

Best,
Jark

On Tue, 31 Mar 2020 at 00:36, Jark Wu  wrote:

> Hi all,
>
> Thanks for the feedbacks.
>
> It seems that we have a conclusion to put the version into the factory
> identifier. I'm also fine with this.
> If we have this outcome, the interface of Factory#factoryVersion is not
> needed anymore, this can simplify the learning cost of new factory.
> We may need to update FLIP-95 and re-vote for it? cc @Timo Walther
> 
>
> Btw, I would like to use "_" instead of "-" as the version delimiter,
> because "-" looks like minus and may confuse users, e.g. "elasticsearch-6".
> This is not forced, but should be a guilde in the Javadoc of Factory.
> I propose to use the following identifiers for existing connectors,
>
> kafka  => kafka for 0.11+ versions, we don't suffix "-universal", because
> the meaning of "universal" not easy to understand.
> kafka-0.11 => kafka for 0.11 version
> kafka-0.10 => kafka for 0.10 version
> elasticsearch-6 => elasticsearch for 6.x versions
> elasticsearch-7 => elasticsearch for 7.x versions
> hbase-1.4 => hbase for 1.4.x versions
> jdbc
> filesystem
>
> We use "-" as the version delimiter to make them to be more consistent.
> This is not forces, users can also use other delimiters or without
> delimiter.
> But this can be a guilde in the Javadoc of Factory, to make the connector
> ecosystem to be more consistent.
>
> What do you think?
>
> 
>
> Regarding "connector.property-version":
>
> Hi @Dawid Wysakowicz  , the new fatories are
> designed not support to read current properties.
> All the current properties are routed to the old factories if they are
> using "connector.type". Otherwise, properties are routed to new factories.
>
> If I understand correctly, the "connector.property-version" is attched
> implicitly by system, not manually set by users.
> For example, the framework should add "connector.property-version=1" to
> properties when processing DDL statement.
> I'm fine to add a "connector.property-version=1" when processing DDL
> statement, but I think it's also fine if we don't,
> because this can be done in the future if need and the default version can
> be 1.
>
> Best,
> Jark
>
>
>
>
>
> On Mon, 30 Mar 2020 at 23:21, Dawid Wysakowicz 
> wrote:
>
>> Hi all,
>>
>> I like the overall design of the FLIP.
>>
>> As for the withstanding concerns. I kind of like the approach to put the
>> version into the factory identifier. I think it's the cleanest way to
>> say that this version actually applies to the connector itself and not
>> to the system it connects to. BTW, I think the outcome of this
>> discussion will affect interfaces described in FLIP-95. If we put the
>> version into the functionIdentifier, do we need Factory#

Re: [DISCUSS] FLIP-84 Feedback Summary

2020-03-30 Thread Jark Wu
:
> >>
> >> USE CATALOG 'mycat';
> >> INSERT INTO t1 SELECT * FROM s;
> >> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>
> >> For executeMultilineSql():
> >>
> >> sync because regular SQL
> >> sync because regular Batch SQL
> >> async because Streaming SQL
> >>
> >> For executeAsyncMultilineSql():
> >>
> >> async because everything should be async
> >> async because everything should be async
> >> async because everything should be async
> >>
> >> What we should not start for executeAsyncMultilineSql():
> >>
> >> sync because DDL
> >> async because everything should be async
> >> async because everything should be async
> >>
> >> What are you thoughts here?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 26.03.20 12:50, godfrey he wrote:
> >>> Hi Timo,
> >>>
> >>> I agree with you that streaming queries mostly need async execution.
> >>> In fact, our original plan is only introducing sync methods in this
> FLIP,
> >>> and async methods (like "executeSqlAsync") will be introduced in the
> >> future
> >>> which is mentioned in the appendix.
> >>>
> >>> Maybe the async methods also need to be considered in this FLIP.
> >>>
> >>> I think sync methods is also useful for streaming which can be used to
> >> run
> >>> bounded source.
> >>> Maybe we should check whether all sources are bounded in sync execution
> >>> mode.
> >>>
> >>>> Also, if we block for streaming queries, we could never support
> >>>> multiline files. Because the first INSERT INTO would block the further
> >>>> execution.
> >>> agree with you, we need async method to submit multiline files,
> >>> and files should be limited that the DQL and DML should be always in
> the
> >>> end for streaming.
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>> Timo Walther  于2020年3月26日周四 下午4:29写道:
> >>>
> >>>> Hi Godfrey,
> >>>>
> >>>> having control over the job after submission is a requirement that was
> >>>> requested frequently (some examples are [1], [2]). Users would like to
> >>>> get insights about the running or completed job. Including the jobId,
> >>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>
> >>>> It is good to have a discussion about synchronous/asynchronous
> >>>> submission now to have a complete execution picture.
> >>>>
> >>>> I thought we submit streaming queries mostly async and just wait for
> the
> >>>> successful submission. If we block for streaming queries, how can we
> >>>> collect() or print() results?
> >>>>
> >>>> Also, if we block for streaming queries, we could never support
> >>>> multiline files. Because the first INSERT INTO would block the further
> >>>> execution.
> >>>>
> >>>> If we decide to block entirely on streaming queries, we need the async
> >>>> execution methods in the design already. However, I would rather go
> for
> >>>> non-blocking streaming queries. Also with the `EMIT STREAM` key word
> in
> >>>> mind that we might add to SQL statements soon.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>
> >>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> Thanks for the updating.
> >>>>>
> >>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>> `TableEnvironment.executeSql()` only supports single line statement,
> >> and
> >>>> we
> >>>>> can support multiline statement later (needs more discussion about
> >> this).
> >>>>>
> >>>>> Regarding to "StatementSet.explian()", I don't have strong opinions
> >> about
> >>>>> that.
> >>>>>
> >>>>> Regarding to "TableResult.getJobClient(

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi Kurt,

I also prefer "-" as version delimiter now. I didn't remove the "_"
proposal by mistake, that's why I sent another email last night :)
Regarding to "property-version", I also think we shouldn't let users to
learn about this. And ConfigOption provides a good ability
to support deprecated keys and auto-generate documentation for deprecated
keys.

Hi Danny,

Regarding to “connector.properties.*”:
In FLIP-95, the Factory#requiredOptions() and Factory#optionalOptions()
inferfaces are only used for generation of documentation.
It does not influence the discovery and validation of a factory. The
validation logic is defined by connectors
in createDynamicTableSource/Sink().
So you don't have to provide an option for "connector.properties.*". But I
think we should make ConfigOption support wildcard in the long term for a
full story.

I don't think we should inline all the "connector.properties.*", otherwise,
it will be very tricky for users to configure the properties.
Regarding to FLIP-113, I suggest to provide some ConfigOptions for commonly
used kafka properties and put them in the supportedHintOptions(),
e.g. "connector.properties.group.id",
"connector.properties.fetch.min.bytes".

Best,
Jark





On Tue, 31 Mar 2020 at 12:04, Danny Chan  wrote:

> Thanks Jark for bring up this discussion, +1 for this idea, I believe the
> user has suffered from the verbose property key for long time.
>
> Just one question, how do we handle the keys with wildcard, such as the
> “connector.properties.*” in Kafka connector which would then hand-over to
> Kafka client directly. As what suggested in FLIP-95, we use a ConfigOption
> to describe the “supported properties”, then I have to concerns:
>
> • For the new keys, do we still need to put multi-lines there the such
> key, such as “connector.properties.abc” “connector.properties.def”, or
> should we inline them, such as “some-key-prefix” = “k1=v1, k2=v2 ..."
> • Should the ConfigOption support the wildcard ? (If we plan to support
> the current multi-line style)
>
>
> Best,
> Danny Chan
> 在 2020年3月31日 +0800 AM12:37,Jark Wu ,写道:
> > Hi all,
> >
> > Thanks for the feedbacks.
> >
> > It seems that we have a conclusion to put the version into the factory
> > identifier. I'm also fine with this.
> > If we have this outcome, the interface of Factory#factoryVersion is not
> > needed anymore, this can simplify the learning cost of new factory.
> > We may need to update FLIP-95 and re-vote for it? cc @Timo Walther
> > 
> >
> > kafka => kafka for 0.11+ versions, we don't suffix "-universal", because
> > the meaning of "universal" not easy to understand.
> > kafka-0.11 => kafka for 0.11 version
> > kafka-0.10 => kafka for 0.10 version
> > elasticsearch-6 => elasticsearch for 6.x versions
> > elasticsearch-7 => elasticsearch for 7.x versions
> > hbase-1.4 => hbase for 1.4.x versions
> > jdbc
> > filesystem
> >
> > We use "-" as the version delimiter to make them to be more consistent.
> > This is not forces, users can also use other delimiters or without
> > delimiter.
> > But this can be a guilde in the Javadoc of Factory, to make the connector
> > ecosystem to be more consistent.
> >
> > What do you think?
> >
> > 
> >
> > Regarding "connector.property-version":
> >
> > Hi @Dawid Wysakowicz  , the new fatories are
> > designed not support to read current properties.
> > All the current properties are routed to the old factories if they are
> > using "connector.type". Otherwise, properties are routed to new
> factories.
> >
> > If I understand correctly, the "connector.property-version" is attched
> > implicitly by system, not manually set by users.
> > For example, the framework should add "connector.property-version=1" to
> > properties when processing DDL statement.
> > I'm fine to add a "connector.property-version=1" when processing DDL
> > statement, but I think it's also fine if we don't,
> > because this can be done in the future if need and the default version
> can
> > be 1.
> >
> > Best,
> > Jark
> >
> > On Tue, 31 Mar 2020 at 00:36, Jark Wu  wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the feedbacks.
> > >
> > > It seems that we have a conclusion to put the version into the factory
> > > identifier. I'm also fine with this.
> > > If we have this outcome, the interface of Factory#factory

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi everyone,

Thanks for the great feedbacks so far.

I updated the FLIP documentation according to the discussion. Changes
include:
- remove "version" key, and merge it into "connector"
- add "scan", "lookup", "sink" prefix to some property keys if they are
only used in that case.
- add a "New Property Key" section to list all the previous property keys
and new property keys.

We use "scan" and "lookup" instead of "source" prefix because we should
distinguish them and they aligns to FLIP-95 ScanTableSource and
LookupTableSource.
I also colored red for some major change of property keys in the FLIP. I
will list some of them here too:

kafka:
connector.startup-mode => scan.startup.mode
connector.specific-offsets => scan.startup.specific-offsets
connector.startup-timestamp-millis => scan.startup.timestamp-millis
connector.sink-partitioner & connector.sink-partitioner-class =>
sink.partitioner

elasticsearch:
connector.key-delimiter => document-id.key-delimiter  # make it
explicit that it is used for document id
connector.key-null-literal => document-id.key-null-literal  # and
it also can be used for es sources in the future
connector.bulk-flush.back-off.type => sink.bulk-flush.back-off.strategy

jdbc:
connector.table => table-name

Welcome further feedbacks!

Best,
Jark


On Tue, 31 Mar 2020 at 14:45, Jark Wu  wrote:

> Hi Kurt,
>
> I also prefer "-" as version delimiter now. I didn't remove the "_"
> proposal by mistake, that's why I sent another email last night :)
> Regarding to "property-version", I also think we shouldn't let users to
> learn about this. And ConfigOption provides a good ability
> to support deprecated keys and auto-generate documentation for deprecated
> keys.
>
> Hi Danny,
>
> Regarding to “connector.properties.*”:
> In FLIP-95, the Factory#requiredOptions() and Factory#optionalOptions()
> inferfaces are only used for generation of documentation.
> It does not influence the discovery and validation of a factory. The
> validation logic is defined by connectors
> in createDynamicTableSource/Sink().
> So you don't have to provide an option for "connector.properties.*". But I
> think we should make ConfigOption support wildcard in the long term for a
> full story.
>
> I don't think we should inline all the "connector.properties.*",
> otherwise, it will be very tricky for users to configure the properties.
> Regarding to FLIP-113, I suggest to provide some ConfigOptions for
> commonly used kafka properties and put them in the supportedHintOptions(),
> e.g. "connector.properties.group.id",
> "connector.properties.fetch.min.bytes".
>
> Best,
> Jark
>
>
>
>
>
> On Tue, 31 Mar 2020 at 12:04, Danny Chan  wrote:
>
>> Thanks Jark for bring up this discussion, +1 for this idea, I believe the
>> user has suffered from the verbose property key for long time.
>>
>> Just one question, how do we handle the keys with wildcard, such as the
>> “connector.properties.*” in Kafka connector which would then hand-over to
>> Kafka client directly. As what suggested in FLIP-95, we use a ConfigOption
>> to describe the “supported properties”, then I have to concerns:
>>
>> • For the new keys, do we still need to put multi-lines there the such
>> key, such as “connector.properties.abc” “connector.properties.def”, or
>> should we inline them, such as “some-key-prefix” = “k1=v1, k2=v2 ..."
>> • Should the ConfigOption support the wildcard ? (If we plan to support
>> the current multi-line style)
>>
>>
>> Best,
>> Danny Chan
>> 在 2020年3月31日 +0800 AM12:37,Jark Wu ,写道:
>> > Hi all,
>> >
>> > Thanks for the feedbacks.
>> >
>> > It seems that we have a conclusion to put the version into the factory
>> > identifier. I'm also fine with this.
>> > If we have this outcome, the interface of Factory#factoryVersion is not
>> > needed anymore, this can simplify the learning cost of new factory.
>> > We may need to update FLIP-95 and re-vote for it? cc @Timo Walther
>> > 
>> >
>> > kafka => kafka for 0.11+ versions, we don't suffix "-universal", because
>> > the meaning of "universal" not easy to understand.
>> > kafka-0.11 => kafka for 0.11 version
>> > kafka-0.10 => kafka for 0.10 version
>> > elasticsearch-6 => elasticsearch for 6.x versions
>> > elasticsearch-7 => elasticsearch for 7.x versions
>> > hbase-1.4 => hbase for 1.4.x versions
>> > jdbc
>> >

Re: [VOTE] FLIP-110: Support LIKE clause in CREATE TABLE

2020-03-31 Thread Jark Wu
+1 from my side. This will be a very useful feature. 

Best,
Jark

> 2020年3月31日 18:15,Danny Chan  写道:
> 
> +1 for this feature, although the WITH syntax breaks the SQL standard, but 
> it’s compatible with our CREATE TABLE syntax, seems well from my side.
> 
> Best,
> Danny Chan
> 在 2020年3月31日 +0800 PM5:46,Dawid Wysakowicz ,写道:
>> Hi,
>> 
>> Just wanted to notify the voters that after a comment from Jingsong I
>> introduced a new like-option in the FLIP. Because it happened very short
>> after the vote started I will not cancel the vote (only Timo voted
>> before the changed).
>> 
>> Feel free to change your votes if you disagree. Sorry for the inconvenience.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 31/03/2020 09:43, Timo Walther wrote:
>>> +1 this will reduce manual schema work a lot!
>>> 
>>> Thanks,
>>> Timo
>>> 
>>> On 31.03.20 09:33, Dawid Wysakowicz wrote:
 Hi all,
 
 I would like to start the vote for FLIP-110 [1], which is discussed and
 reached a consensus in the discussion thread [2].
 
 The vote will be open until April 3rd (72h), unless there is an
 objection or not enough votes.
 
 Best,
 
 Dawid
 
 [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A+Support+LIKE+clause+in+CREATE+TABLE
 
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-110-Support-LIKE-clause-in-CREATE-TABLE-td38378.html/
 /
 
>>> 
>> 



Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-31 Thread Jark Wu
Hi, Dawid

Regarding to `connector.property-version`,
I totally agree with you we should implicitly add a "property-version=1"
(without 'connector.' prefix) property for future evolving. So I updated
FLIP for this.
However, I still doubt to use property version to distinguish old/new
factory. Because it will break existing DDLs, unless users manually set
`connector.property-version=1` to their existing DDLs. So I still prefer to
use `connector` vs `connector.type` to distinguish old/new factory.



Hi Timo,

+1 to zookeeper.znode-parent

> sink.bulk-flush -> sink.buffer-flush?
I would like to keep using bulk-flush, because "bulk" is a well-known
Elasticsearch API and terminology [1].
I think we don't need to align all the terminologies across Flink
connectors. Following the external system's
terminology will be more easy-to-understand for connector users.

> username -> secrect.username?
That's a good idea to hide secret values in logs. However, is there a
better way to do that? For example, add a secretOptions() method to Factory?
IMO, a `secrect.` prefix is too weak and limit the design of a property
key. For example, we want to support authentication for elasticserch [2],
a possible property keys will be `authentication.enabled=true`,
`authentication.username=jark`, `authentication.password=123456`.

[1]:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
[2]: https://issues.apache.org/jira/browse/FLINK-16788


Hi Zhenghua,

> does this affect descriptors and related validators?
No. As described in the compatiblity section, all the old properties will
be routed to the old factories.
So all the current descriptors (will be translated to old property keys)
are still compatible.
But, we should have a plan to translate current descritors into new
property keys.
However, that is not in the scope of this FLIP and could be done in a
separate simple JIRA issue.

Best,
Jark

On Tue, 31 Mar 2020 at 16:08, Zhenghua Gao  wrote:

> Hi Jark,
>
> Thanks for the proposal. I'm +1 since it's more simple and clear for sql
> users.
> I have a question about this: does this affect descriptors and related
> validators?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Mar 30, 2020 at 2:02 PM Jark Wu  wrote:
>
> > Hi everyone,
> >
> > I want to start a discussion about further improve and simplify our
> current
> > connector porperty keys, aka WITH options. Currently, we have a
> > 'connector.' prefix for many properties, but they are verbose, and we
> see a
> > big inconsistency between the properties when designing FLIP-107.
> >
> > So we propose to remove all the 'connector.' prefix and rename
> > 'connector.type' to 'connector', 'format.type' to 'format'. So a new
> Kafka
> > DDL may look like this:
> >
> > CREATE TABLE kafka_table (
> >  ...
> > ) WITH (
> >  'connector' = 'kafka',
> >  'version' = '0.10',
> >  'topic' = 'test-topic',
> >  'startup-mode' = 'earliest-offset',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'properties.group.id' = 'testGroup',
> >  'format' = 'json',
> >  'format.fail-on-missing-field' = 'false'
> > );
> >
> > The new connector property key set will come together with new Factory
> > inferface which is proposed in FLIP-95. Old properties are still
> compatible
> > with their existing implementation. New properties are only available in
> > new DynamicTableFactory implementations.
> >
> > You can access the detailed FLIP here:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> >
> > Best,
> > Jark
> >
>


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-31 Thread Jark Wu
Hi everyone,

In order to not postpone FLIP-95 further, I include the "removing
Factory#factoryVersion" in this FLIP.
I updated the "Proposed Changes" section to reflect the changes.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

Please let me know if you have other questions.

Best,
Jark


On Wed, 1 Apr 2020 at 00:56, Jark Wu  wrote:

> Hi, Dawid
>
> Regarding to `connector.property-version`,
> I totally agree with you we should implicitly add a "property-version=1"
> (without 'connector.' prefix) property for future evolving. So I updated
> FLIP for this.
> However, I still doubt to use property version to distinguish old/new
> factory. Because it will break existing DDLs, unless users manually set
> `connector.property-version=1` to their existing DDLs. So I still prefer
> to use `connector` vs `connector.type` to distinguish old/new factory.
>
> 
>
> Hi Timo,
>
> +1 to zookeeper.znode-parent
>
> > sink.bulk-flush -> sink.buffer-flush?
> I would like to keep using bulk-flush, because "bulk" is a well-known
> Elasticsearch API and terminology [1].
> I think we don't need to align all the terminologies across Flink
> connectors. Following the external system's
> terminology will be more easy-to-understand for connector users.
>
> > username -> secrect.username?
> That's a good idea to hide secret values in logs. However, is there a
> better way to do that? For example, add a secretOptions() method to Factory?
> IMO, a `secrect.` prefix is too weak and limit the design of a property
> key. For example, we want to support authentication for elasticserch [2],
> a possible property keys will be `authentication.enabled=true`,
> `authentication.username=jark`, `authentication.password=123456`.
>
> [1]:
> https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
> [2]: https://issues.apache.org/jira/browse/FLINK-16788
> 
>
> Hi Zhenghua,
>
> > does this affect descriptors and related validators?
> No. As described in the compatiblity section, all the old properties will
> be routed to the old factories.
> So all the current descriptors (will be translated to old property keys)
> are still compatible.
> But, we should have a plan to translate current descritors into new
> property keys.
> However, that is not in the scope of this FLIP and could be done in a
> separate simple JIRA issue.
>
> Best,
> Jark
>
> On Tue, 31 Mar 2020 at 16:08, Zhenghua Gao  wrote:
>
>> Hi Jark,
>>
>> Thanks for the proposal. I'm +1 since it's more simple and clear for sql
>> users.
>> I have a question about this: does this affect descriptors and related
>> validators?
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Mon, Mar 30, 2020 at 2:02 PM Jark Wu  wrote:
>>
>> > Hi everyone,
>> >
>> > I want to start a discussion about further improve and simplify our
>> current
>> > connector porperty keys, aka WITH options. Currently, we have a
>> > 'connector.' prefix for many properties, but they are verbose, and we
>> see a
>> > big inconsistency between the properties when designing FLIP-107.
>> >
>> > So we propose to remove all the 'connector.' prefix and rename
>> > 'connector.type' to 'connector', 'format.type' to 'format'. So a new
>> Kafka
>> > DDL may look like this:
>> >
>> > CREATE TABLE kafka_table (
>> >  ...
>> > ) WITH (
>> >  'connector' = 'kafka',
>> >  'version' = '0.10',
>> >  'topic' = 'test-topic',
>> >  'startup-mode' = 'earliest-offset',
>> >  'properties.bootstrap.servers' = 'localhost:9092',
>> >  'properties.group.id' = 'testGroup',
>> >  'format' = 'json',
>> >  'format.fail-on-missing-field' = 'false'
>> > );
>> >
>> > The new connector property key set will come together with new Factory
>> > inferface which is proposed in FLIP-95. Old properties are still
>> compatible
>> > with their existing implementation. New properties are only available in
>> > new DynamicTableFactory implementations.
>> >
>> > You can access the detailed FLIP here:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>> >
>> > Best,
>> > Jark
>> >
>>
>


Re: [DISCUSS] Change default planner to blink planner in 1.11

2020-03-31 Thread Jark Wu
+1 to make blink planner as default planner.

We should give blink planner more exposure to encourage users trying out
new features and lead users to migrate to blink planner.

Glad to see blink planner is used in production since 1.9! @Benchao

Best,
Jark

On Wed, 1 Apr 2020 at 11:31, Benchao Li  wrote:

> Hi Kurt,
>
> It's excited to hear that the community aims to make Blink Planner default
> in 1.11.
>
> We have been using blink planner since 1.9 for streaming processing, it
> works very well,
> and covers many use cases in our company.
> So +1 to make it default in 1.11 from our side.
>
> Kurt Young  于2020年4月1日周三 上午9:15写道:
>
>> Hi Dev and User,
>>
>> Blink planner for Table API & SQL is introduced in Flink 1.9 and already
>> be the default planner for
>> SQL client in Flink 1.10. And since we already decided not introducing
>> any new features to the
>> original Flink planner, it already lacked of so many great features that
>> the community has been working on, such as brand new type system, more DDL
>> support and more planner capabilities.
>> During this time, we've also received lots of great feedback from users
>> who were trying to use blink
>> planner, both positive and negative (like bugs). This is a good sign, it
>> at least shows more and more
>> users are starting to try out.
>>
>> So I want to start this discussion more formally to talk about
>> replacing the default planner to blink.
>> Specifically, I want to gather feedbacks from dev and user about whether
>> blink planner already
>> cover the original planner's capabilities, what kind of issues you've ran
>> into when try out blink
>> planner and then make you fallback to original one. Since there is still
>> a month to go when feature
>> freeze, there's still enough time for community to further enhance blink
>> planner for this purpose.
>>
>> Let me know what you think, especially if you want to report or complain
>> about something. Thanks
>> in advance.
>>
>> Best,
>> Kurt
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-04-01 Thread Jark Wu
Hi everyone,

If there are no objections, I would like to start a voting thread by
tomorrow. So this is the last call to give feedback for FLIP-122.

Cheers,
Jark

On Wed, 1 Apr 2020 at 16:30, zoudan  wrote:

> Hi Jark,
> Thanks for the proposal.
> I like the idea that we put the version in ‘connector’ field. That will be
> friendly for existing jobs as some of existing connectors may do not
> contains  ‘connector.version’.
>
> Best,
> Dan Zou
>
>
>


Re: [ANNOUNCE] New Committers and PMC member

2020-04-01 Thread Jark Wu
Congratulations to you all!

Best,
Jark

On Wed, 1 Apr 2020 at 20:33, Kurt Young  wrote:

> Congratulations to you all!
>
> Best,
> Kurt
>
>
> On Wed, Apr 1, 2020 at 7:41 PM Danny Chan  wrote:
>
> > Congratulations!
> >
> > Best,
> > Danny Chan
> > 在 2020年4月1日 +0800 PM7:36,dev@flink.apache.org,写道:
> > >
> > > Congratulations!
> >
>


Re: [VOTE] FLIP-95: New TableSource and TableSink interfaces

2020-04-02 Thread Jark Wu
Hi Dawid,

> How to express projections with TableSchema?
The TableSource holds the original TableSchema (i.e. from DDL) and the
pushed TableSchema represents the schema after projection.
Thus the table source can compare them to figure out changed field orders
or not matched types.
For most sources who maps physical storage by field names (e.g. jdbc,
hbase, json) they can just simply apply the pushed TableSchema.
But sources who maps by field indexes (e.g. csv), they need to figure out
the projected indexes by comparing the original and projected schema.
For example, the original schema is [a: String, b: Int, c: Timestamp], and
b is pruned, then the pushed schema is [a: String, c: Timestamp]. So the
source can figure out index=1 is pruned.

> How do we express projection of a nested field with TableSchema?
This is the same to the above one. For example, the original schema is [rk:
String, f1 Row].
If `f1.q1` is pruned, the pushed schema will be [rk: String, f1 Row].

> TableSchema might be used at too many different places for different
responsibilities.
Agree. We have recognized that a structure and builder for pure table
schema is required in many places. But we mixed many concepts of catalog
table schema in TableSchema.
IIRC, in an offline discussion of FLIP-84, we want to introduce a new
`CatalogTableSchema` to represent the schema part of a DDL,
and remove all the watermark, computed column information from TableSchema?
Then `TableSchema` can continue to serve as a pure table schema and it
stays in a good package.

Best,
Jark




On Thu, 2 Apr 2020 at 19:39, Timo Walther  wrote:

> Hi Dawid,
>
> thanks for your feedback. I agree with your concerns. I also observed
> that TableSchema might be used at too many different places for
> different responsibilities.
>
> How about we introduce a helper class for `SupportsProjectionPushDown`
> and also `LookupTableSource#Context#getKeys()` to represent nested
> structure of names. Data types, constraints, or computed columns are not
> necessary at those locations.
>
> We can also add utility methods for connectors to this helper class
> there to quickly figuring out differences between the original table
> schema and the new one.
>
> SelectedFields {
>
> private LogicalType orignalRowType; // set by the planner
>
> private int[][] indices;
>
> getNames(int... at): String[]
>
> getNames(String... at): String[]
>
> getIndices(int... at): int[]
>
> getNames(String... at): String[]
>
> toTableSchema(): TableSchema
> }
>
> What do others think?
>
> Thanks,
> Timo
>
>
>
> On 02.04.20 12:28, Dawid Wysakowicz wrote:
> > Generally +1
> >
> > One slight concern I have is about the |SupportsProjectionPushDown.|I
> > don't necessarily understand how can we express projections with
> > TableSchema. It's unclear for me what happens when a type of a field
> > changes, fields are in a different order, when types do not match. How
> > do we express projection of a nested field with TableSchema?
> >
> > I don't think this changes the core design presented in the FLIP,
> > therefore I'm fine with accepting the FLIP. I wanted to mention my
> > concerns, so that maybe we can adjust the passed around structures
> slightly.
> >
> > Best,
> >
> > Dawid
> > ||
> >
> > On 30/03/2020 14:42, Leonard Xu wrote:
> >> +1(non-binding)
> >>
> >> Best,
> >> Leonard Xu
> >>
> >>> 在 2020年3月30日,16:43,Jingsong Li  写道:
> >>>
> >>> +1
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Mon, Mar 30, 2020 at 4:41 PM Kurt Young  wrote:
> >>>
> >>>> +1
> >>>>
> >>>> Best,
> >>>> Kurt
> >>>>
> >>>>
> >>>> On Mon, Mar 30, 2020 at 4:08 PM Benchao Li
> wrote:
> >>>>
> >>>>> +1 (non-binding)
> >>>>>
> >>>>> Jark Wu  于2020年3月30日周一 下午3:57写道:
> >>>>>
> >>>>>> +1 from my side.
> >>>>>>
> >>>>>> Thanks Timo for driving this.
> >>>>>>
> >>>>>> Best,
> >>>>>> Jark
> >>>>>>
> >>>>>> On Mon, 30 Mar 2020 at 15:36, Timo Walther
> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I would like to start the vote for FLIP-95 [1], which is discussed
> >>>> and
> >>>>>>> reached a consensus in the discussion thread [2].
> >>>>>>>
> >>>>>>> The vote will be open until April 2nd (72h), unless there is an
> >>>>>>> objection or not enough votes.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>
> https://lists.apache.org/thread.html/r03cbce8996fd06c9b0406c9ddc0d271bd456f943f313b9261fa061f9%40%3Cdev.flink.apache.org%3E
> >>>>> --
> >>>>>
> >>>>> Benchao Li
> >>>>> School of Electronics Engineering and Computer Science, Peking
> University
> >>>>> Tel:+86-15650713730
> >>>>> Email:libenc...@gmail.com;libenc...@pku.edu.cn
> >>>>>
> >>> --
> >>> Best, Jingsong Lee
>
>


Re: [VOTE] FLIP-95: New TableSource and TableSink interfaces

2020-04-02 Thread Jark Wu
Hi Timo,

I don't think source should work with `CatalogTableSchema`. So far, a table
source doesn't need to know the logic information of computed column and
watermark.
IMO, we should provide a method to convert from `CatalogTableSchema` into
`TableSchema` without computed columns in source factory,
and a source should just hold the `TableSchema`.

I agree doing the intersection/diff logic is trivial, but maybe we can
provide utilities to do that? So that we can keep the interface clean.


Best,
Jark


On Thu, 2 Apr 2020 at 20:17, Timo Walther  wrote:

> Hi Jark,
>
> if catalogs use `CatalogTableSchema` in the future. The source would
> internally also work with `CatalogTableSchema`. I'm fine with cleaning
> up the `TableSchema` class but should a source deal with two different
> schema classes then?
>
> Another problem that I see is that connectors usually need to perform
> some index arithmetics. Dealing with TableSchema and additionally within
> a field with DataType might be a bit inconvenient. A dedicated class
> with utilities might be helpful such that not every source needs to
> implement the same intersection/diff logic again.
>
> Regards,
> Timo
>
>
> On 02.04.20 14:06, Jark Wu wrote:
> > Hi Dawid,
> >
> >> How to express projections with TableSchema?
> > The TableSource holds the original TableSchema (i.e. from DDL) and the
> > pushed TableSchema represents the schema after projection.
> > Thus the table source can compare them to figure out changed field orders
> > or not matched types.
> > For most sources who maps physical storage by field names (e.g. jdbc,
> > hbase, json) they can just simply apply the pushed TableSchema.
> > But sources who maps by field indexes (e.g. csv), they need to figure out
> > the projected indexes by comparing the original and projected schema.
> > For example, the original schema is [a: String, b: Int, c: Timestamp],
> and
> > b is pruned, then the pushed schema is [a: String, c: Timestamp]. So the
> > source can figure out index=1 is pruned.
> >
> >> How do we express projection of a nested field with TableSchema?
> > This is the same to the above one. For example, the original schema is
> [rk:
> > String, f1 Row].
> > If `f1.q1` is pruned, the pushed schema will be [rk: String, f1 Row > Double>].
> >
> >> TableSchema might be used at too many different places for different
> > responsibilities.
> > Agree. We have recognized that a structure and builder for pure table
> > schema is required in many places. But we mixed many concepts of catalog
> > table schema in TableSchema.
> > IIRC, in an offline discussion of FLIP-84, we want to introduce a new
> > `CatalogTableSchema` to represent the schema part of a DDL,
> > and remove all the watermark, computed column information from
> TableSchema?
> > Then `TableSchema` can continue to serve as a pure table schema and it
> > stays in a good package.
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Thu, 2 Apr 2020 at 19:39, Timo Walther  wrote:
> >
> >> Hi Dawid,
> >>
> >> thanks for your feedback. I agree with your concerns. I also observed
> >> that TableSchema might be used at too many different places for
> >> different responsibilities.
> >>
> >> How about we introduce a helper class for `SupportsProjectionPushDown`
> >> and also `LookupTableSource#Context#getKeys()` to represent nested
> >> structure of names. Data types, constraints, or computed columns are not
> >> necessary at those locations.
> >>
> >> We can also add utility methods for connectors to this helper class
> >> there to quickly figuring out differences between the original table
> >> schema and the new one.
> >>
> >> SelectedFields {
> >>
> >>  private LogicalType orignalRowType; // set by the planner
> >>
> >>  private int[][] indices;
> >>
> >>  getNames(int... at): String[]
> >>
> >>  getNames(String... at): String[]
> >>
> >>  getIndices(int... at): int[]
> >>
> >>  getNames(String... at): String[]
> >>
> >>  toTableSchema(): TableSchema
> >> }
> >>
> >> What do others think?
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >>
> >> On 02.04.20 12:28, Dawid Wysakowicz wrote:
> >>> Generally +1
> >>>
> >>> One slight concern I have is about the |SupportsProjectionPushDown.|I
> >>

[VOTE] FLIP-122: New Connector Property Keys for New Factory

2020-04-02 Thread Jark Wu
Hi all,

I would like to start the vote for FLIP-122 [1], which is discussed and
reached a consensus in the discussion thread [2].

The vote will be open for at least 72h, unless there is an objection or not
enough votes.

Thanks,
Timo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-122-New-Connector-Property-Keys-for-New-Factory-td39462.html


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-04-06 Thread Jark Wu
I'm fine to disable this feature by default and avoid
whitelisting/blacklisting. This simplifies a lot of things.

Regarding to TableSourceFactory#Context#getExecutionOptions, do we really
need this interface?
Should the connector factory be aware of the properties is merged with
hints or not?
What's the problem if we always get properties from
`CatalogTable#getProperties`?

Best,
Jark

On Tue, 7 Apr 2020 at 10:39, Kurt Young  wrote:

> Sounds like a reasonable compromise, disabling this feature by default is a
> way to protect
> the vulnerability, and we can simplify the design quite a lot. We can
> gather some users'
> feedback to see whether further protections are necessary in the future.
>
> Best,
> Kurt
>
>
> On Mon, Apr 6, 2020 at 11:49 PM Timo Walther  wrote:
>
> > I agree with Aljoscha. The length of this thread shows that this is
> > highly controversal. I think nobody really likes this feature 100% but
> > we could not find a better solution. I would consider it as a
> > nice-to-have improvement during a notebook/debugging session.
> >
> > I would accept avoiding whitelisting/blacklisting if the feature is
> > disabled by default. And we make the merged properties available in a
> > separate TableSourceFactory#Context#getExecutionOptions as Danny
> proposed.
> >
> > What do you think?
> >
> > Thanks,
> > Timo
> >
> >
> > On 06.04.20 09:59, Aljoscha Krettek wrote:
> > > The reason I'm saying it should be disabled by default is that this
> uses
> > > hint syntax, and hints should really not change query semantics.
> > >
> > > I'm quite strongly against hints that change query semantics, but if we
> > > disable this by default I would be (reluctantly) OK with the feature.
> > > Companies that create deployments or set up the SQL environment for
> > > users can enable the feature if they want.
> > >
> > > But yes, I also agree that we don't need whitelisting/blacklisting,
> > > which makes this a lot easier to do.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 06.04.20 04:27, Danny Chan wrote:
> > >> Hi, everyone ~
> > >>
> > >> @Aljoscha @Timo
> > >>
> > >>> I think we're designing ourselves into ever more complicated corners
> > >> here
> > >>
> > >> I kindly agree that, personally didn't see strong reasons why we
> > >> should limit on each connector properties:
> > >>
> > >> • we can define any table options for CREATE TABLE, why we treat the
> > >> dynamic options differently, we never consider any security problems
> > >> when create table, we should not either for dynamic table options
> > >> • If we do not have whitelist properties or blacklist properties, the
> > >> table source creation work would be much easier, just used the merged
> > >> options. There is no need to modify each connector to decide which
> > >> options could be overridden and how we merge them(the merge work is
> > >> redundant).
> > >> • @Timo, how about we support another interface
> > >> `TableSourceFactory#Context.getExecutionOptions`, we always use this
> > >> interface to get the options to create our table source. There is no
> > >> need to copy the catalog table itselt, we just need to generate our
> > >> Context correctly.
> > >> • @Aljoscha I agree to have a global config option, but I disagree to
> > >> default disable it, a global default config would break the user
> > >> experience too much, especially when user want to modify the options
> > >> in a ad-hoc way.
> > >>
> > >>
> > >>
> > >> I suggest to remove `TableSourceFactory#supportedHintOptions` or
> > >> `TableSourceFactory#forbiddenHintOptions` based on the fact that we
> > >> does not have black/white list for CREATE TABLE at all at lease for
> > >> current codebase.
> > >>
> > >>
> > >> @Timo (i have replied offline but allows to represent it here again)
> > >>
> > >> The `TableSourceFactory#supportedHintOptions` doesn't work well for 3
> > >> reasons compared to `TableSourceFactory#forbiddenHintOptions`:
> > >> 1. For key with wildcard, like connector.property.* , use a blacklist
> > >> make us have the ability to disable some of the keys under that, i.e.
> > >> connector.property.key1 , a whitelist can only match with prefix
> > >>
> > >> 2. We want the connectors to have the ability to disable format type
> > >> switch format.type but allows all the other properties, e.g. format.*
> > >> without format.type(let's call it SET_B), if we use the whitelist, we
> > >> have to enumerate all the specific format keys start with format
> > >> (SET_B), but with the old connector factories, we have no idea what
> > >> specific format keys it supports(there is either a format.* or
> nothing).
> > >>
> > >> 3. Except the cases for 1 and 2, for normal keys(no wildcard), the
> > >> blacklist and whitelist has the same expressiveness, use blacklist
> > >> makes the code not too verbose to enumerate all the duplicate keys
> > >> with #supportedKeys .(Not very strong reason, but i think as a
> > >> connector developer, it makes sense)
> > >>
> > >> Best,
> > >> 

Re: [DISCUSS] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-06 Thread Jark Wu
Hi everyone,

Since this FLIP was proposed, the community has discussed a lot about the
first approach: introducing new TableSource and TableSink interfaces to
support changelog.
And yes, that is FLIP-95 which has been accepted last week. So most of the
work has been merged into FLIP-95.

In order to support the goal of FLIP-105, there is still a little things to
discuss: how to connect external CDC formats.
We propose to introduce 2 new formats: Debezium format and Canal format.
They are the most popular CDC tools according to the survey in user [1] and
user-zh [2] mailing list.

I have updated the FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL

Welcome feedbacks!

Best,
Jark

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SURVEY-What-Change-Data-Capture-tools-are-you-using-td33569.html
[2]: http://apache-flink.147419.n8.nabble.com/SURVEY-CDC-td1910.html


On Fri, 14 Feb 2020 at 22:08, Jark Wu  wrote:

> Hi everyone,
>
> I would like to start discussion about how to support interpreting
> external changelog into Flink SQL, and how to emit changelog from Flink SQL.
>
> This topic has already been mentioned several times in the past. CDC
> (Change Data Capture) data has been a very important streaming data in the
> world. Connect to CDC is a significant feature for Flink, it fills the
> missing piece for Flink's streaming processing.
>
> In FLIP-105, we propose 2 approaches to achieve.
> One is introducing new TableSource interface (higher priority),
> the other is introducing new SQL syntax to interpret and emit changelog.
>
> FLIP-105:
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>
> Thanks for any feedback!
>
> Best,
> Jark
>


Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Jark Wu
Hi Dawid,

Thanks for driving this. This is a blocker to support Debezium CDC format
(FLIP-105). So big +1 from my side.

Regarding to emitting multiple records and checkpointing, I'm also in favor
of option#1: buffer all the records outside of the checkpoint lock.
I think most of the use cases will not buffer larger data than
it's deserialized byte[].

I have a minor suggestion on DeserializationSchema: could we have a default
implementation (maybe throw exception) for `T deserialize(byte[] message)`?
I think this will not break compatibility, and users don't have to
implement this deprecated interface if he/she wants to use the new
collector interface.
I think SinkFunction also did this in the same way: introduce a new invoke
method with Context parameter, and give the old invoke method an
empty implemention.

Best,
Jark

On Mon, 6 Apr 2020 at 23:51, Seth Wiesman  wrote:

> I would be in favor of buffering data outside of the checkpoint lock. In my
> experience, serialization is always the biggest performance killer in user
> code and I have a hard time believing in practice that anyone is going to
> buffer so many records that is causes real memory concerns.
>
> To add to Timo's point,
>
> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
>
> Seth
>
> [1]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> [2]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
>
>
> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther  wrote:
>
> > Hi Dawid,
> >
> > thanks for this FLIP. This solves a lot of issues with the current
> > design for both the Flink contributors and users. +1 for this.
> >
> > Some minor suggestions from my side:
> > - How about finding something shorter for `InitializationContext`? Maybe
> > just `OpenContext`?
> > - While introducing default methods for existing interfaces, shall we
> > also create contexts for those methods? I see the following method in
> > your FLIP and wonder if we can reduce the number of parameters while
> > introducing a new method:
> >
> > deserialize(
> >  byte[] recordValue,
> >  String partitionKey,
> >  String seqNum,
> >  long approxArrivalTimestamp,
> >  String stream,
> >  String shardId,
> >  Collector out)
> >
> > to:
> >
> > deserialize(
> >  byte[] recordValue,
> >  Context c,
> >  Collector out)
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > > Hi devs,
> > >
> > > When working on improving the Table API/SQL connectors we faced a few
> > > shortcomings of the DeserializationSchema and SerializationSchema
> > > interfaces. Similar features were also mentioned by other users in the
> > > past. The shortcomings I would like to address with the FLIP include:
> > >
> > >   * Emitting 0 to m records from the deserialization schema with per
> > > partition watermarks
> > >   o
> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> > >   o differentiate null value from no value
> > >   o support for Debezium CDC format
> > > (
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > )
> > >
> > >   * A way to initialize the schema
> > >   o establish external connections
> > >   o generate code on startup
> > >   o no need for lazy initialization
> > >
> > >   * Access to metrics
> > > [
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> > ]
> > >
> > > One important aspect I would like to hear your opinion on is how to
> > > support the Collector interface in Kafka source. Of course if we agree
> > > to add the Collector to the DeserializationSchema.
> > >
> > > The FLIP can be found here:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> > >
> > > Looking forward to your feedback.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> >
> >
>


Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Jark Wu
Thanks for the explanation. Sounds good to me.

Best,
Jark

On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz 
wrote:

> Hi all,
>
> @Timo I'm fine with OpenContext.
>
> @Timo @Seth Sure we can combine all the parameters in a single object.
> Will update the FLIP
>
> @Jark I was aware of the implementation of SinkFunction, but it was a
> conscious choice to not do it that way.
>
> Personally I am against giving a default implementation to both the new
> and old methods. This results in an interface that by default does
> nothing or notifies the user only in the runtime, that he/she has not
> implemented a method of the interface, which does not sound like a good
> practice to me. Moreover I believe the method without a Collector will
> still be the preferred method by many users. Plus it communicates
> explicitly what is the minimal functionality required by the interface.
> Nevertheless I am happy to hear other opinions.
>
> @all I also prefer the buffering approach. Let's wait a day or two more
> to see if others think differently.
>
> Best,
>
> Dawid
>
> On 07/04/2020 06:11, Jark Wu wrote:
> > Hi Dawid,
> >
> > Thanks for driving this. This is a blocker to support Debezium CDC format
> > (FLIP-105). So big +1 from my side.
> >
> > Regarding to emitting multiple records and checkpointing, I'm also in
> favor
> > of option#1: buffer all the records outside of the checkpoint lock.
> > I think most of the use cases will not buffer larger data than
> > it's deserialized byte[].
> >
> > I have a minor suggestion on DeserializationSchema: could we have a
> default
> > implementation (maybe throw exception) for `T deserialize(byte[]
> message)`?
> > I think this will not break compatibility, and users don't have to
> > implement this deprecated interface if he/she wants to use the new
> > collector interface.
> > I think SinkFunction also did this in the same way: introduce a new
> invoke
> > method with Context parameter, and give the old invoke method an
> > empty implemention.
> >
> > Best,
> > Jark
> >
> > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman  wrote:
> >
> >> I would be in favor of buffering data outside of the checkpoint lock.
> In my
> >> experience, serialization is always the biggest performance killer in
> user
> >> code and I have a hard time believing in practice that anyone is going
> to
> >> buffer so many records that is causes real memory concerns.
> >>
> >> To add to Timo's point,
> >>
> >> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
> >>
> >> Seth
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> >> [2]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
> >>
> >>
> >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther  wrote:
> >>
> >>> Hi Dawid,
> >>>
> >>> thanks for this FLIP. This solves a lot of issues with the current
> >>> design for both the Flink contributors and users. +1 for this.
> >>>
> >>> Some minor suggestions from my side:
> >>> - How about finding something shorter for `InitializationContext`?
> Maybe
> >>> just `OpenContext`?
> >>> - While introducing default methods for existing interfaces, shall we
> >>> also create contexts for those methods? I see the following method in
> >>> your FLIP and wonder if we can reduce the number of parameters while
> >>> introducing a new method:
> >>>
> >>> deserialize(
> >>>  byte[] recordValue,
> >>>  String partitionKey,
> >>>  String seqNum,
> >>>  long approxArrivalTimestamp,
> >>>  String stream,
> >>>  String shardId,
> >>>  Collector out)
> >>>
> >>> to:
> >>>
> >>> deserialize(
> >>>  byte[] recordValue,
> >>>  Context c,
> >>>  Collector out)
> >>>
> >>> What do you think?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>>
> >>> On 

Re: [VOTE] FLIP-122: New Connector Property Keys for New Factory

2020-04-07 Thread Jark Wu
+1 (binding)

Best,
Jark

On Sun, 5 Apr 2020 at 16:38, zoudan  wrote:

> +1 (non-binding)
>
> Best,
> Dan Zou
>
>
> > 在 2020年4月3日,10:02,LakeShen  写道:
> >
> > +1 (non-binding)
>
>


[RESULT][VOTE] FLIP-122: New Connector Property Keys for New Factory

2020-04-07 Thread Jark Wu
Hi all,

The voting time for FLIP-122 has passed. I'm closing the vote now.

There were 8 +1 votes, 4 of which are binding:

- Timo (binding)
- Dawid (binding)
- Benchao Li (non-binding)
- Jingsong Li (binding)
- LakeShen (non-binding)
- Leonard Xu (non-binding)
- zoudan (non-binding)
- Jark (binding)

There were no disapproving votes.

Thus, FLIP-122 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Jark


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-04-07 Thread Jark Wu
Thanks for the summary Danny. +1 to the new proposal.

I have a minor concern about the global configuration
`table.optimizer.dynamic-table-options.enabled`, does it belong to
optimizer?
>From my point of view, it is just an API to set table options and uses
Calcite in the implementation.
I'm also thinking about what's the name of other configurations, e.g
time-zone, code-gen length, state ttl.
Should they prefix with "optimizer" or "exec" or something else or nothing?

Best,
Jark

On Tue, 7 Apr 2020 at 23:17, Timo Walther  wrote:

> Thanks for the update Danny. +1 from my side.
>
> Regards,
> Timo
>
>
> On 07.04.20 13:25, Danny Chan wrote:
> > Hi, every ~
> >
> > It seems that we all agree to drop the idea for white/black list for each
> > connector, and have a global config option to default disable this
> feature.
> >
> > I have also discussed with Timo and Jark about the interface
> > TableSourceTable.Context.getExecutionOptions and finally we decide to
> > introduce a new interface CatalogTable#copy(Map) to
> support
> > re-generate the table with new table options.
> >
> > So let me summarize the current design broadly again:
> >
> > - Use the syntax /*+ OPTIONS('k1' = 'v1', 'k2' = 'v2') to describe
> the
> > dynamic table options
> > - There is no constraint on which option key can be used in the
> OPTIONS,
> > that means, any option key is allowed, the factory would to the
> validation
> > work finally
> > - Introduce method CatalogTable#copy, we use this method to
> regenerate a
> > new CatalogTable to find a table factory and creates table
> source/sink
> > - There is a global config option to default disable this feature (if
> > user uses OPTIONS, an exception throws to tell open the option)
> >
> > I have updated the WIKI
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
> >,
> > look forward to your suggestions ~
> >
> > Jark Wu  于2020年4月7日周二 上午11:24写道:
> >
> >> I'm fine to disable this feature by default and avoid
> >> whitelisting/blacklisting. This simplifies a lot of things.
> >>
> >> Regarding to TableSourceFactory#Context#getExecutionOptions, do we
> really
> >> need this interface?
> >> Should the connector factory be aware of the properties is merged with
> >> hints or not?
> >> What's the problem if we always get properties from
> >> `CatalogTable#getProperties`?
> >>
> >> Best,
> >> Jark
> >>
> >> On Tue, 7 Apr 2020 at 10:39, Kurt Young  wrote:
> >>
> >>> Sounds like a reasonable compromise, disabling this feature by default
> >> is a
> >>> way to protect
> >>> the vulnerability, and we can simplify the design quite a lot. We can
> >>> gather some users'
> >>> feedback to see whether further protections are necessary in the
> future.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Mon, Apr 6, 2020 at 11:49 PM Timo Walther 
> wrote:
> >>>
> >>>> I agree with Aljoscha. The length of this thread shows that this is
> >>>> highly controversal. I think nobody really likes this feature 100% but
> >>>> we could not find a better solution. I would consider it as a
> >>>> nice-to-have improvement during a notebook/debugging session.
> >>>>
> >>>> I would accept avoiding whitelisting/blacklisting if the feature is
> >>>> disabled by default. And we make the merged properties available in a
> >>>> separate TableSourceFactory#Context#getExecutionOptions as Danny
> >>> proposed.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Thanks,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 06.04.20 09:59, Aljoscha Krettek wrote:
> >>>>> The reason I'm saying it should be disabled by default is that this
> >>> uses
> >>>>> hint syntax, and hints should really not change query semantics.
> >>>>>
> >>>>> I'm quite strongly against hints that change query semantics, but if
> >> we
> >>>>> disable this by default I would be (reluctantly) OK with the feature.
> >>>>> Companies that create deployments or set up the SQL environment for
> >

Re: [DISCUSS] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-08 Thread Jark Wu
Hi Kurt,

The JSON encoding of Debezium can be configured to include or exclude the
message schema using the `value.converter.schemas.enable` properties.
That's why we propose to have a `format.schema-include` property key to
config how to parse the json.

Besides, the encoding format of debezium is stable and unified across
different databases (MySQL, Oracle, SQL Server, DB2, PostgresSQL).
However, because of the limitation of some special databases, some
databases CDC encoding are different (Cassandra and MongoDB).
If we want to support them in the future, we can introduce an optional
property key, e.g. `format.encoding-connector=mongodb`, to recognize this
special encoding.

Canal currently only support to capture changes from MySQL, so there is
only one encoding in Canal. But both Canal and Debezium may evolve their
encoding in the future.
We can also introduce a `format.encoding-version` in the future if needed.

Best,
Jark


On Wed, 8 Apr 2020 at 14:26, Kurt Young  wrote:

> One minor comment, is there any other encoding or format in debezium? I'm
> asking because the format
> name is debezium-json, i'm wondering whether debezium is enough. This also
> applies to canal.
>
> Best,
> Kurt
>
>
> On Tue, Apr 7, 2020 at 11:49 AM Jark Wu  wrote:
>
> > Hi everyone,
> >
> > Since this FLIP was proposed, the community has discussed a lot about the
> > first approach: introducing new TableSource and TableSink interfaces to
> > support changelog.
> > And yes, that is FLIP-95 which has been accepted last week. So most of
> the
> > work has been merged into FLIP-95.
> >
> > In order to support the goal of FLIP-105, there is still a little things
> to
> > discuss: how to connect external CDC formats.
> > We propose to introduce 2 new formats: Debezium format and Canal format.
> > They are the most popular CDC tools according to the survey in user [1]
> and
> > user-zh [2] mailing list.
> >
> > I have updated the FLIP:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> >
> > Welcome feedbacks!
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SURVEY-What-Change-Data-Capture-tools-are-you-using-td33569.html
> > [2]: http://apache-flink.147419.n8.nabble.com/SURVEY-CDC-td1910.html
> >
> >
> > On Fri, 14 Feb 2020 at 22:08, Jark Wu  wrote:
> >
> > > Hi everyone,
> > >
> > > I would like to start discussion about how to support interpreting
> > > external changelog into Flink SQL, and how to emit changelog from Flink
> > SQL.
> > >
> > > This topic has already been mentioned several times in the past. CDC
> > > (Change Data Capture) data has been a very important streaming data in
> > the
> > > world. Connect to CDC is a significant feature for Flink, it fills the
> > > missing piece for Flink's streaming processing.
> > >
> > > In FLIP-105, we propose 2 approaches to achieve.
> > > One is introducing new TableSource interface (higher priority),
> > > the other is introducing new SQL syntax to interpret and emit
> changelog.
> > >
> > > FLIP-105:
> > >
> >
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
> > >
> > > Thanks for any feedback!
> > >
> > > Best,
> > > Jark
> > >
> >
>


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-04-08 Thread Jark Wu
`table.dynamic-table-options.enabled` and `TableConfigOptions` sounds good
to me.

Best,
Jark

On Wed, 8 Apr 2020 at 18:59, Danny Chan  wrote:

> `table.dynamic-table-options.enabled` seems fine to me, I would make a new
> `TableConfigOptions` class and put the config option there ~
>
> What do you think about the new class to put ?
>
> Best,
> Danny Chan
> 在 2020年4月8日 +0800 PM5:33,dev@flink.apache.org,写道:
> >
> > `table.dynamic-table-options.enabled`
>


Re: Configuring autolinks to Flink JIRA ticket in github repos

2020-04-09 Thread Jark Wu
Thanks Yun,

This's a great feature! I was surprised by the autolink feature yesterday
(didn't know your work at that time).

Best,
Jark

On Thu, 9 Apr 2020 at 16:12, Yun Tang  wrote:

> Hi community
>
> The autolink to Flink JIRA ticket has taken effect. You could refer to the
> commit details page[1] to see all Flink JIRA titles within commits has the
> hyper link underline. Moreover, you don't need to use markdown language to
> create hyper link to Flink JIRA ticket when discussing in the pull
> requests. e.g FLINK-16850 could point to the link instead of [FLINK-16850](
> https://issues.apache.org/jira/browse/FLINK-16850)
>
>
> [1] https://github.com/apache/flink/commits/master
>
> Best
> Yun Tang
>
> 
> From: Till Rohrmann 
> Sent: Thursday, April 2, 2020 23:11
> To: dev 
> Subject: Re: Configuring autolinks to Flink JIRA ticket in github repos
>
> Nice, this is a cool feature. Thanks for asking INFRA for it.
>
> Cheers,
> Till
>
> On Wed, Apr 1, 2020 at 6:52 PM Yun Tang  wrote:
>
> > Hi community.
> >
> > I noticed that Github supports autolink reference recently [1]. This is
> > helpful to allow developers could open Jira ticket link from pull
> requests
> > title directly when accessing github repo.
> >
> > I have already created INFRA-20055 [2] to ask for configuration for seven
> > Flink related github repos. Hope it could be resolved soon 🙂
> >
> >
> > [1]
> >
> https://help.github.com/en/github/administering-a-repository/configuring-autolinks-to-reference-external-resources
> > [2] https://issues.apache.org/jira/browse/INFRA-20055
> >
> > Best
> > Yun Tang
> >
>


Re: [DISCUSS] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-10 Thread Jark Wu
Hi,

After a short offline discussion with Kurt, It seems that I misunderstood
Kurt's meaning.
Kurt meant: is `format=debezium` is enough, or split into two options
`format=debezium` and `format.encoding=json`.

Debezium not only support JSON encoding, but also Avro. Canal supports JSON
and Protobuf. So a single `format=debezium` is not enough (in the long
term).
The reason I proposed a single option `format=debezium-json` instead of two:
 - It's simpler to write a single option instead of two, we also make this
design decision for "connector" and "version".
 - I didn't find a good name for separate option keys, because JSON is also
a format, not an encoding, but `format.format=json` is weird.

Hi everyone,

If there are no further concerns, I would like to start a voting thread by
tomorrow.

Best,
Jark



On Wed, 8 Apr 2020 at 15:37, Jark Wu  wrote:

> Hi Kurt,
>
> The JSON encoding of Debezium can be configured to include or exclude the
> message schema using the `value.converter.schemas.enable` properties.
> That's why we propose to have a `format.schema-include` property key to
> config how to parse the json.
>
> Besides, the encoding format of debezium is stable and unified across
> different databases (MySQL, Oracle, SQL Server, DB2, PostgresSQL).
> However, because of the limitation of some special databases, some
> databases CDC encoding are different (Cassandra and MongoDB).
> If we want to support them in the future, we can introduce an optional
> property key, e.g. `format.encoding-connector=mongodb`, to recognize this
> special encoding.
>
> Canal currently only support to capture changes from MySQL, so there is
> only one encoding in Canal. But both Canal and Debezium may evolve their
> encoding in the future.
> We can also introduce a `format.encoding-version` in the future if needed.
>
> Best,
> Jark
>
>
> On Wed, 8 Apr 2020 at 14:26, Kurt Young  wrote:
>
>> One minor comment, is there any other encoding or format in debezium? I'm
>> asking because the format
>> name is debezium-json, i'm wondering whether debezium is enough. This also
>> applies to canal.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Apr 7, 2020 at 11:49 AM Jark Wu  wrote:
>>
>> > Hi everyone,
>> >
>> > Since this FLIP was proposed, the community has discussed a lot about
>> the
>> > first approach: introducing new TableSource and TableSink interfaces to
>> > support changelog.
>> > And yes, that is FLIP-95 which has been accepted last week. So most of
>> the
>> > work has been merged into FLIP-95.
>> >
>> > In order to support the goal of FLIP-105, there is still a little
>> things to
>> > discuss: how to connect external CDC formats.
>> > We propose to introduce 2 new formats: Debezium format and Canal format.
>> > They are the most popular CDC tools according to the survey in user [1]
>> and
>> > user-zh [2] mailing list.
>> >
>> > I have updated the FLIP:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
>> >
>> > Welcome feedbacks!
>> >
>> > Best,
>> > Jark
>> >
>> > [1]:
>> >
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SURVEY-What-Change-Data-Capture-tools-are-you-using-td33569.html
>> > [2]: http://apache-flink.147419.n8.nabble.com/SURVEY-CDC-td1910.html
>> >
>> >
>> > On Fri, 14 Feb 2020 at 22:08, Jark Wu  wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > I would like to start discussion about how to support interpreting
>> > > external changelog into Flink SQL, and how to emit changelog from
>> Flink
>> > SQL.
>> > >
>> > > This topic has already been mentioned several times in the past. CDC
>> > > (Change Data Capture) data has been a very important streaming data in
>> > the
>> > > world. Connect to CDC is a significant feature for Flink, it fills the
>> > > missing piece for Flink's streaming processing.
>> > >
>> > > In FLIP-105, we propose 2 approaches to achieve.
>> > > One is introducing new TableSource interface (higher priority),
>> > > the other is introducing new SQL syntax to interpret and emit
>> changelog.
>> > >
>> > > FLIP-105:
>> > >
>> >
>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>> > >
>> > > Thanks for any feedback!
>> > >
>> > > Best,
>> > > Jark
>> > >
>> >
>>
>


Re: [VOTE] FLIP-113: Supports Dynamic Table Options for Flink SQL

2020-04-10 Thread Jark Wu
+1 from my side (binding)

Best,
Jark

On Fri, 10 Apr 2020 at 17:03, Timo Walther  wrote:

> +1 (binding)
>
> Thanks for the healthy discussion. I think this feature can be useful
> during the development of a pipeline.
>
> Regards,
> Timo
>
> On 10.04.20 03:34, Danny Chan wrote:
> > Hi all,
> >
> > I would like to start the vote for FLIP-113 [1], which is discussed and
> > reached a consensus in the discussion thread [2].
> >
> > The vote will be open until April 13nd (72h), unless there is an
> > objection or not enough votes.
> >
> > Best,
> > Danny Chan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
> > [2]
> >
> https://lists.apache.org/thread.html/r94af5d3d97e76e7dd9df68cb0becc7ba74d15591a8fab84c72fa%40%3Cdev.flink.apache.org%3E
> >
>
>


Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-10 Thread Jark Wu
Hi Xiaogang,

I think this proposal doesn't conflict with your use case, you can still
chain a ProcessFunction after a source which emits raw data.
But I'm not in favor of chaining ProcessFunction after source, and we
should avoid that, because:

1) For correctness, it is necessary to perform the watermark generation as
early as possible in order to be close to the actual data
 generation within a source's data partition. This is also the purpose of
per-partition watermark and event-time alignment.
 Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
Deseriazing records and generating watermark in chained
 ProcessFunction makes it difficult to do per-partition watermark in the
future.
2) In Flink SQL, a source should emit the deserialized row instead of raw
data. Otherwise, users have to define raw byte[] as the
 single column of the defined table, and parse them in queries, which is
very inconvenient.

Best,
Jark

On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang  wrote:

> Hi,
>
> I don't think the proposal is a good solution to the problems. I am in
> favour of using a ProcessFunction chained to the source/sink function to
> serialize/deserialize the records, instead of embedding (de)serialization
> schema in source/sink function.
>
> Message packing is heavily used in our production environment to allow
> compression and improve throughput. As buffered messages have to be
> delivered when the time exceeds the limit, timers are also required in our
> cases. I think it's also a common need for other users.
>
> In the this proposal, with more components added into the context, in the
> end we will find the serialization/deserialization schema is just another
> wrapper of ProcessFunction.
>
> Regards,
> Xiaogang
>
> Aljoscha Krettek  于2020年4月7日周二 下午6:34写道:
>
> > On 07.04.20 08:45, Dawid Wysakowicz wrote:
> >
> > > @Jark I was aware of the implementation of SinkFunction, but it was a
> > > conscious choice to not do it that way.
> > >
> > > Personally I am against giving a default implementation to both the new
> > > and old methods. This results in an interface that by default does
> > > nothing or notifies the user only in the runtime, that he/she has not
> > > implemented a method of the interface, which does not sound like a good
> > > practice to me. Moreover I believe the method without a Collector will
> > > still be the preferred method by many users. Plus it communicates
> > > explicitly what is the minimal functionality required by the interface.
> > > Nevertheless I am happy to hear other opinions.
> >
> > Dawid and I discussed this before. I did the extension of the
> > SinkFunction but by now I think it's better to do it this way, because
> > otherwise you can implement the interface without implementing any
> methods.
> >
> > > @all I also prefer the buffering approach. Let's wait a day or two more
> > > to see if others think differently.
> >
> > I'm also in favour of buffering outside the lock.
> >
> > Also, +1 to this FLIP.
> >
> > Best,
> > Aljoscha
> >
>


Re: [DISCUSS] FLIP-71 - E2E View support in Flink SQL

2020-04-10 Thread Jark Wu
Sorry for the late reply,

I have some concern around "Supporting SHOW VIEWS|DESCRIBE VIEW name".
Currently, in SQL CLI, the "SHOW TABLES" will also list views and "DESCRIBE
name" can also describe a view.
Shall we remove the view support in those commands if we want to support a
dedicate "SHOW VIEWS|DESCRIBE VIEW name"?

Brest,
Jark

On Wed, 8 Apr 2020 at 23:49, Timo Walther  wrote:

> I didn't know that. We should definitely implement this asap. Please
> open a JIRA issue.
>
> Thanks,
> Timo
>
>
> On 08.04.20 14:29, Zhenghua Gao wrote:
> > Hi Timo,
> >
> > Actually "TEMPORARY" is not supported in table DDL now.
> > But you are right I could support "CREATE TEMPORARY VIEW" in this FLIP.
> > And may be we should open a separate JIRA ticket to track supporting it
> in
> > table DDL?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Wed, Apr 8, 2020 at 7:48 PM Timo Walther  wrote:
> >
> >> Hi Zhenghua,
> >>
> >> FLINK-10232 is quite old and a lot of stuff was discussed and agreed on
> >> since then. I don't like to postpone the 'TEMPORARY' keyword because it
> >> is a important concept that is already part of the Table API (see
> >> TableEnvironment.createTemporaryView) and in function DDL and table DDL.
> >> It is not complicated to supported it in this FLIP and just a couple of
> >> line of code more.
> >>
> >> Regards,
> >> Timo
> >>
> >> On 08.04.20 11:27, Zhenghua Gao wrote:
> >>> Another concern about "CREATE DDL" is:
> >>>
> >>> FLINK-10232 proposes using "IF NOT EXISTS" to control the behavior
> when a
> >>> view or table with the same name already exists.
> >>> And "OR REPLACE" for type/library/function DDL.
> >>>
> >>> @godfrey he  I will keep the "IF NOT EXISTS"
> syntax
> >>> and postpone the "OR REPLACE" syntax until we need it.
> >>>
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Wed, Apr 8, 2020 at 4:46 PM Zhenghua Gao  wrote:
> >>>
>  Hi Timo,
> 
>  Shall we postpone the support of 'TEMPORARY' keyword since it's not
>  mentioned in FLINK-10232?
>  
> 
>  *Best Regards,*
>  *Zhenghua Gao*
> 
> 
>  On Wed, Apr 8, 2020 at 3:30 PM Timo Walther 
> wrote:
> 
> > Hi Zhenghua,
> >
> > VIEWS should also support the TEMPORARY keyword according to FLIP-64.
> >
> > Otherwise the FLIP looks good to me.
> >
> > Regards,
> > Timo
> >
> >
> > On 08.04.20 07:31, Zhenghua Gao wrote:
> >> @Danny Chan   you‘re right. I have updated
> the
> > doc.
> >>
> >> *Best Regards,*
> >> *Zhenghua Gao*
> >>
> >>
> >> On Wed, Apr 8, 2020 at 1:20 PM Danny Chan 
> >> wrote:
> >>
> >>> +1 for the proposal, a small concern for drop view statement:
> >>>
> >>> dropViewStatement:
> >>>  DROP VIEW name [ IF EXISTS ]
> >>> I think the drop statement should be
> >>> DROP VIEW [ IF EXISTS ] name
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年4月8日 +0800 AM11:54,Kurt Young ,写道:
>  This FLIP seems to be quite straightforward, +1 from my side.
> 
>  Best,
>  Kurt
> 
> 
>  On Tue, Apr 7, 2020 at 8:42 PM Zhenghua Gao 
> >> wrote:
> 
> > forward the reply to ML too.
> >
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > -- Forwarded message -
> > From: Zhenghua Gao 
> > Date: Tue, Apr 7, 2020 at 8:40 PM
> > Subject: Re: [DISCUSS] FLIP-71 - E2E View support in Flink SQL
> > To: godfrey he 
> >
> >
> >>> regarding to "Interoperability between Flink and Hive is not
> > guaranteed", can you explain this more?
> > We have several limitations of interoperability between flink
> >> objects
> >>> and
> > hive objects (tables, functions, etc).
> > So we don't promise the interoperability of views between flink
> and
> >>> hive
> > since a view is defined base on these objects.
> >
> >>> "CREATE VIEW [ IF NOT EXISTS ]"
> > This should be "CREATE VIEW [OR REPLACE]".
> >
> >>> "DESC"
> > It's a shortcut of "DESCRIBE" in SQL Client (See desc table xxx).
> > In DDL, we should only support "SHOW VIEWS" and "DESCRIBE VIEW
> >> xxx".
> >
> > I have updated the design doc, thanks.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Tue, Apr 7, 2020 at 8:09 PM godfrey he 
> > wrote:
> >
> >> Hi Zhenghua,
> >>
> >> Thanks for driving this. It's one step forward that
> >> TableEnvironment
> >> supports more complete SQLs.
> >> I have a few minor questions:
> >> 1. regarding to "Interoperability between Flink and Hive is not
> >> guaranteed", can you explain this more?
>

[VOTE] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-10 Thread Jark Wu
Hi all,

I would like to start the vote for FLIP-105 [1], which is discussed and
reached a consensus in the discussion thread [2].

The vote will be open for at least 72h, unless there is an objection or not
enough votes.

Thanks,
Jark

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-105-Support-to-Interpret-and-Emit-Changelog-in-Flink-SQL-td37665.html


Re: [VOTE] FLIP-71: E2E View support in Flink SQL

2020-04-12 Thread Jark Wu
+1

Best,
Jark

On Sun, 12 Apr 2020 at 12:28, Benchao Li  wrote:

> +1 (non-binding)
>
> zoudan  于2020年4月12日周日 上午9:52写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Dan Zou
> >
> >
> > > 在 2020年4月10日,09:30,Danny Chan  写道:
> > >
> > > +1 from my side.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2020年4月9日 +0800 PM9:23,Timo Walther ,写道:
> > >> +1 (binding)
> > >>
> > >> Thanks for your efforts.
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 09.04.20 14:46, Zhenghua Gao wrote:
> > >>> Hi all,
> > >>>
> > >>> I'd like to start the vote for FLIP-71[1] which adds E2E view support
> > in
> > >>> Flink SQL.
> > >>> This FLIP is discussed in the thread[2].
> > >>>
> > >>> The vote will be open for at least 72 hours. Unless there is an
> > objection.
> > >>> I will try to
> > >>> close it by April 13, 2020 09:00 UTC if we have received sufficient
> > votes.
> > >>>
> > >>> [1]
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71%3A+E2E+View+support+in+FLINK+SQL
> > >>>
> > >>> [2]
> > >>>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-71-E2E-View-support-in-Flink-SQL-td33131.html#a39787
> > >>>
> > >>> *Best Regards,*
> > >>> *Zhenghua Gao*
> > >>>
> > >>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: [VOTE] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-13 Thread Jark Wu
+1 (binding)

Best,
Jark

On Sun, 12 Apr 2020 at 09:24, Benchao Li  wrote:

> +1 (non-binding)
>
> Jark Wu  于2020年4月11日周六 上午11:31写道:
>
> > Hi all,
> >
> > I would like to start the vote for FLIP-105 [1], which is discussed and
> > reached a consensus in the discussion thread [2].
> >
> > The vote will be open for at least 72h, unless there is an objection or
> not
> > enough votes.
> >
> > Thanks,
> > Jark
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > [2]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-105-Support-to-Interpret-and-Emit-Changelog-in-Flink-SQL-td37665.html
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


[RESULT][VOTE] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-14 Thread Jark Wu
Hi all,

The voting time for FLIP-105 has passed. I'm closing the vote now.

There were 5 +1 votes, 3 of which are binding:

- Benchao (non-binding)
- Jark (binding)
- Jingsong Li (binding)
- zoudan (non-binding)
- Kurt (binding)

There were no disapproving votes.

Thus, FLIP-105 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Jark


Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-04-15 Thread Jark Wu
+1 to the proposal. I also found the "download additional jar" step is
really verbose when I prepare webinars.

At least, I think the flink-csv and flink-json should in the distribution,
they are quite small and don't have other dependencies.

Best,
Jark

On Wed, 15 Apr 2020 at 15:44, Jeff Zhang  wrote:

> Hi Aljoscha,
>
> Big +1 for the fat flink distribution, where do you plan to put these
> connectors ? opt or lib ?
>
> Aljoscha Krettek  于2020年4月15日周三 下午3:30写道:
>
> > Hi Everyone,
> >
> > I'd like to discuss about releasing a more full-featured Flink
> > distribution. The motivation is that there is friction for SQL/Table API
> > users that want to use Table connectors which are not there in the
> > current Flink Distribution. For these users the workflow is currently
> > roughly:
> >
> >   - download Flink dist
> >   - configure csv/Kafka/json connectors per configuration
> >   - run SQL client or program
> >   - decrypt error message and research the solution
> >   - download additional connector jars
> >   - program works correctly
> >
> > I realize that this can be made to work but if every SQL user has this
> > as their first experience that doesn't seem good to me.
> >
> > My proposal is to provide two versions of the Flink Distribution in the
> > future: "fat" and "slim" (names to be discussed):
> >
> >   - slim would be even trimmer than todays distribution
> >   - fat would contain a lot of convenience connectors (yet to be
> > determined which one)
> >
> > And yes, I realize that there are already more dimensions of Flink
> > releases (Scala version and Java version).
> >
> > For background, our current Flink dist has these in the opt directory:
> >
> >   - flink-azure-fs-hadoop-1.10.0.jar
> >   - flink-cep-scala_2.12-1.10.0.jar
> >   - flink-cep_2.12-1.10.0.jar
> >   - flink-gelly-scala_2.12-1.10.0.jar
> >   - flink-gelly_2.12-1.10.0.jar
> >   - flink-metrics-datadog-1.10.0.jar
> >   - flink-metrics-graphite-1.10.0.jar
> >   - flink-metrics-influxdb-1.10.0.jar
> >   - flink-metrics-prometheus-1.10.0.jar
> >   - flink-metrics-slf4j-1.10.0.jar
> >   - flink-metrics-statsd-1.10.0.jar
> >   - flink-oss-fs-hadoop-1.10.0.jar
> >   - flink-python_2.12-1.10.0.jar
> >   - flink-queryable-state-runtime_2.12-1.10.0.jar
> >   - flink-s3-fs-hadoop-1.10.0.jar
> >   - flink-s3-fs-presto-1.10.0.jar
> >   - flink-shaded-netty-tcnative-dynamic-2.0.25.Final-9.0.jar
> >   - flink-sql-client_2.12-1.10.0.jar
> >   - flink-state-processor-api_2.12-1.10.0.jar
> >   - flink-swift-fs-hadoop-1.10.0.jar
> >
> > Current Flink dist is 267M. If we removed everything from opt we would
> > go down to 126M. I would reccomend this, because the large majority of
> > the files in opt are probably unused.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [DISCUSS] Releasing Flink 1.10.1

2020-04-15 Thread Jark Wu
t; >> >>>> > > > > >  increase is more likely to cause problem.
> > >> >>>> > > > > >
> > >> >>>> > > > > > So basically only people have small 'process.size' in
> > >> custom
> > >> >>>> config
> > >> >>>> > > > file
> > >> >>>> > > > > > are really affected. I'm not sure what is the
> proportion
> > of
> > >> >>>> such
> > >> >>>> > use
> > >> >>>> > > > > cases
> > >> >>>> > > > > > though. (From questions asked on the user ML, probably
> > not
> > >> >>>> much).
> > >> >>>> > > > > >
> > >> >>>> > > > > > Thank you~
> > >> >>>> > > > > >
> > >> >>>> > > > > > Xintong Song
> > >> >>>> > > > > >
> > >> >>>> > > > > >
> > >> >>>> > > > > >
> > >> >>>> > > > > > On Thu, Mar 12, 2020 at 10:09 PM Stephan Ewen <
> > >> >>>> se...@apache.org>
> > >> >>>> > > > wrote:
> > >> >>>> > > > > >
> > >> >>>> > > > > > > No need to revert it now - I am not saying it should
> > not
> > >> go
> > >> >>>> into
> > >> >>>> > > > > 1.10.1,
> > >> >>>> > > > > > I
> > >> >>>> > > > > > > am just saying this is not clear to me yet.
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > We have to trade off the fact that we may break some
> > >> >>>> deployments
> > >> >>>> > > with
> > >> >>>> > > > > the
> > >> >>>> > > > > > > fact that we will "safe" a lot of new deployments.
> > >> >>>> > > > > > > I simply lack the data points / insight at the moment
> > to
> > >> >>>> > understand
> > >> >>>> > > > how
> > >> >>>> > > > > > > significant both cases are, meaning how many users
> > would
> > >> be
> > >> >>>> > > affected
> > >> >>>> > > > > and
> > >> >>>> > > > > > > how badly.
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > Independent of that, improving the error message is
> > >> always
> > >> >>>> > helpful.
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > On Thu, Mar 12, 2020 at 1:22 PM Andrey Zagrebin <
> > >> >>>> > > > > > > azagrebin.apa...@gmail.com>
> > >> >>>> > > > > > > wrote:
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > > >   - For 1.10.1 I am not completely sure, because
> > >> users
> > >> >>>> expect
> > >> >>>> > > to
> > >> >>>> > > > > > > upgrade
> > >> >>>> > > > > > > > > that without config adjustments. That might not
> be
> > >> >>>> possible
> > >> >>>> > > with
> > >> >>>> > > > > that
> > >> >>>> > > > > > > > > change.
> > >> >>>> > > > > > > >
> > >> >>>> > > > > > > > Ok, makes sense, I will revert it for 1.10 and only
> > >> try to
> > >> >>>> > > improve
> > >> >>>> > > > > > error
> > >> >>>> > > > > > > > message and docs.
> > >> >>>> > > > > > > >
> > >> >

Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-04-15 Thread Jark Wu
Hi,

I think we should first reach an consensus on "what problem do we want to
solve?"
(1) improve first experience? or (2) improve production experience?

As far as I can see, with the above discussion, I think what we want to
solve is the "first experience".
And I think the slim jar is still the best distribution for production,
because it's easier to assembling jars
than excluding jars and can avoid potential class conflicts.

If we want to improve "first experience", I think it make sense to have a
fat distribution to give users a more smooth first experience.
But I would like to call it "playground distribution" or something like
that to explicitly differ from the "slim production-purpose distribution".
The "playground distribution" can contains some widely used jars, like
universal-kafka-sql-connector, elasticsearch7-sql-connector, avro, json,
csv, etc..
Even we can provide a playground docker which may contain the fat
distribution, python3, and hive.

Best,
Jark


On Wed, 15 Apr 2020 at 21:47, Chesnay Schepler  wrote:

> I don't see a lot of value in having multiple distributions.
>
> The simple reality is that no fat distribution we could provide would
> satisfy all use-cases, so why even try.
> If users commonly run into issues for certain jars, then maybe those
> should be added to the current distribution.
>
> Personally though I still believe we should only distribute a slim
> version. I'd rather have users always add required jars to the
> distribution than only when they go outside our "expected" use-cases.
> Then we might finally address this issue properly, i.e., tooling to
> assemble custom distributions and/or better error messages if
> Flink-provided extensions cannot be found.
>
> On 15/04/2020 15:23, Kurt Young wrote:
> > Regarding to the specific solution, I'm not sure about the "fat" and
> "slim"
> > solution though. I get the idea
> > that we can make the slim one even more lightweight than current
> > distribution, but what about the "fat"
> > one? Do you mean that we would package all connectors and formats into
> > this? I'm not sure if this is
> > feasible. For example, we can't put all versions of kafka and hive
> > connector jars into lib directory, and
> > we also might need hadoop jars when using filesystem connector to access
> > data from HDFS.
> >
> > So my guess would be we might hand-pick some of the most frequently used
> > connectors and formats
> > into our "lib" directory, like kafka, csv, json metioned above, and still
> > leave some other connectors out of it.
> > If this is the case, then why not we just provide this distribution to
> > user? I'm not sure i get the benefit of
> > providing another super "slim" jar (we have to pay some costs to provide
> > another suit of distribution).
> >
> > What do you think?
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Apr 15, 2020 at 7:08 PM Jingsong Li 
> wrote:
> >
> >> Big +1.
> >>
> >> I like "fat" and "slim".
> >>
> >> For csv and json, like Jark said, they are quite small and don't have
> other
> >> dependencies. They are important to kafka connector, and important
> >> to upcoming file system connector too.
> >> So can we move them to both "fat" and "slim"? They're so important, and
> >> they're so lightweight.
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Wed, Apr 15, 2020 at 4:53 PM godfrey he  wrote:
> >>
> >>> Big +1.
> >>> This will improve user experience (special for Flink new users).
> >>> We answered so many questions about "class not found".
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>> Dian Fu  于2020年4月15日周三 下午4:30写道:
> >>>
> >>>> +1 to this proposal.
> >>>>
> >>>> Missing connector jars is also a big problem for PyFlink users.
> >>> Currently,
> >>>> after a Python user has installed PyFlink using `pip`, he has to
> >> manually
> >>>> copy the connector fat jars to the PyFlink installation directory for
> >> the
> >>>> connectors to be used if he wants to run jobs locally. This process is
> >>> very
> >>>> confuse for users and affects the experience a lot.
> >>>>
> >>>> Regards,
> >>>> Dian
> >>>>
> >>>>> 在 2020年4

Re: [DISCUSS] Releasing Flink 1.9.3

2020-04-15 Thread Jark Wu
+1 for releasing 1.9.3 soon.
Thanks Dian for driving this!

Best,
Jark

On Wed, 15 Apr 2020 at 22:11, Congxian Qiu  wrote:

> +1 to create a new 1.9 bugfix release. and FLINK-16576[1] has merged into
> master, filed a pr for release-1.9 already
>
> [1] https://issues.apache.org/jira/browse/FLINK-16576
>
> Best,
> Congxian
>
>
> Yu Li  于2020年4月15日周三 下午9:16写道:
>
> > +1 to create a new 1.9 bug fix release. Thanks Dian for volunteering as
> our
> > RM.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Wed, 15 Apr 2020 at 16:25, Hequn Cheng  wrote:
> >
> > > +1 for the release and for Dian being the RM.
> > > Thanks Jincheng for your continuous efforts on helping the releasing.
> > >
> > > Best,
> > > Hequn
> > >
> > > On Wed, Apr 15, 2020 at 3:45 PM Till Rohrmann 
> > > wrote:
> > >
> > > > Hi Dian,
> > > >
> > > > creating a new 1.9 bug fix release is a very good idea. +1 for
> creating
> > > it
> > > > soon. Also thanks for volunteering as our release manager.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Apr 10, 2020 at 7:27 AM Dian Fu 
> wrote:
> > > >
> > > > > Hi Jincheng,
> > > > >
> > > > > Thanks a lot for offering help. It would be very helpful. Thanks
> > again!
> > > > >
> > > > > Regards,
> > > > > Dian
> > > > >
> > > > > > 在 2020年4月10日,下午12:46,jincheng sun  写道:
> > > > > >
> > > > > > Hi Dian,
> > > > > >
> > > > > > Thanks for bring up the discussion. I would like to give you a
> hand
> > > at
> > > > > the
> > > > > > last stage when the RC is finished.  :)
> > > > > >
> > > > > > Best,
> > > > > > Jincheng
> > > > > >
> > > > > >
> > > > > >
> > > > > > Dian Fu  于2020年4月10日周五 上午11:08写道:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> It has been more than two months since we released Flink 1.9.2.
> > > There
> > > > > are
> > > > > >> already 36 improvements/bugfixes in the release-1.9 branch.
> > > > Therefore, I
> > > > > >> propose to create the next bugfix release 1.9.3 for Flink 1.9.
> > > > > >>
> > > > > >> Most notable fixes are:
> > > > > >>
> > > > > >> - [FLINK-15085] HistoryServer dashboard config json out of sync
> > > > > >> - [FLINK-15575] Azure Filesystem Shades Wrong Package
> > > "httpcomponents"
> > > > > >> - [FLINK-15638] releasing/create_release_branch.sh does not set
> > > > version
> > > > > in
> > > > > >> flink-python/pyflink/version.py
> > > > > >> - [FLINK-16242] BinaryGeneric serialization error cause
> checkpoint
> > > > > failure
> > > > > >> - [FLINK-16573] Kinesis consumer does not properly shutdown
> > > > > RecordFetcher
> > > > > >> threads
> > > > > >> - [FLINK-16047] Blink planner produces wrong aggregate results
> > with
> > > > > state
> > > > > >> clean up
> > > > > >> - [FLINK-16860] TableException: Failed to push filter into table
> > > > source!
> > > > > >> when upgrading flink to 1.9.2
> > > > > >> - [FLINK-16916] The logic of NullableSerializer#copy is wrong
> > > > > >> - [FLINK-16389] Bump Kafka 0.10 to 0.10.2.2
> > > > > >> - [FLINK-15812] HistoryServer archiving is done in Dispatcher
> main
> > > > > thread
> > > > > >> - [FLINK-17062] Fix the conversion from Java row type to Python
> > row
> > > > type
> > > > > >>
> > > > > >> Furthermore, there is one blocker issue which should be merged
> > > before
> > > > > >> 1.9.3 release:
> > > > > >>
> > > > > >> - [FLINK-16576] State inconsistency on restore with memory state
> > > > > backends
> > > > > >> (reviewing)
> > > > > >>
> > > > > >> I would volunteer as the release manager and kick off the
> release
> > > > > process.
> > > > > >> What do you think?
> > > > > >>
> > > > > >> Please let me know if there are any concerns or any other
> blocker
> > > > issues
> > > > > >> need to be fixed in 1.9.3. Thanks.
> > > > > >>
> > > > > >> Appreciated if there is any PMC could help with the final steps
> of
> > > the
> > > > > >> release process.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Dian
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-16 Thread Jark Wu
+1 (binding)
Thanks Dawid for driving this.

Best,
Jark

On Thu, 16 Apr 2020 at 15:54, Dawid Wysakowicz 
wrote:

> Hi all,
>
> I would like to start the vote for FLIP-124 [1], which is discussed and
> reached a consensus in the discussion thread [2].
>
> The vote will be open until April 20th, unless there is an objection or
> not enough votes.
>
> Best,
>
> Dawid
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
>
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-124-Add-open-close-and-Collector-to-De-SerializationSchema-td39864.html
>


Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-04-16 Thread Jark Wu
 users that
> > > want to use a Flink dist in production. The current Flink dist is too
> > > "thin" for first-time SQL users and it is too "fat" for production
> > > users, that is where serving no-one properly with the current
> > > middle-ground. That's why I think introducing those specialized
> > > "spins" of Flink dist would be good.
> > >
> > > By the way, at some point in the future production users might not
> > > even need to get a Flink dist anymore. They should be able to have
> > > Flink as a dependency of their project (including the runtime) and
> > > then build an image from this for Kubernetes or a fat jar for YARN.
> > >
> > > Aljoscha
> > >
> > > On 15.04.20 18:14, wenlong.lwl wrote:
> > >
> > > Hi all,
> > >
> > > Regarding slim and fat distributions, I think different kinds of jobs
> > > may
> > > prefer different type of distribution:
> > >
> > > For DataStream job, I think we may not like fat distribution
> > >
> > > containing
> > >
> > > connectors because user would always need to depend on the connector
> > >
> > > in
> > >
> > > user code, it is easy to include the connector jar in the user lib.
> > >
> > > Less
> > >
> > > jar in lib means less class conflicts and problems.
> > >
> > > For SQL job, I think we are trying to encourage user to user pure
> > > sql(DDL +
> > > DML) to construct their job, In order to improve user experience, It
> > > may be
> > > important for flink, not only providing as many connector jar in
> > > distribution as possible especially the connector and format we have
> > > well
> > > documented,  but also providing an mechanism to load connectors
> > > according
> > > to the DDLs,
> > >
> > > So I think it could be good to place connector/format jars in some
> > > dir like
> > > opt/connector which would not affect jobs by default, and introduce a
> > > mechanism of dynamic discovery for SQL.
> > >
> > > Best,
> > > Wenlong
> > >
> > > On Wed, 15 Apr 2020 at 22:46, Jingsong Li  <
> > jingsongl...@gmail.com>
> > > wrote:
> > >
> > >
> > > Hi,
> > >
> > > I am thinking both "improve first experience" and "improve production
> > > experience".
> > >
> > > I'm thinking about what's the common mode of Flink?
> > > Streaming job use Kafka? Batch job use Hive?
> > >
> > > Hive 1.2.1 dependencies can be compatible with most of Hive server
> > > versions. So Spark and Presto have built-in Hive 1.2.1 dependency.
> > > Flink is currently mainly used for streaming, so let's not talk
> > > about hive.
> > >
> > > For streaming jobs, first of all, the jobs in my mind is (related to
> > > connectors):
> > > - ETL jobs: Kafka -> Kafka
> > > - Join jobs: Kafka -> DimJDBC -> Kafka
> > > - Aggregation jobs: Kafka -> JDBCSink
> > > So Kafka and JDBC are probably the most commonly used. Of course,
> > >
> > > also
> > >
> > > includes CSV, JSON's formats.
> > > So when we provide such a fat distribution:
> > > - With CSV, JSON.
> > > - With flink-kafka-universal and kafka dependencies.
> > > - With flink-jdbc.
> > > Using this fat distribution, most users can run their jobs well.
> > >
> > > (jdbc
> > >
> > > driver jar required, but this is very natural to do)
> > > Can these dependencies lead to kinds of conflicts? Only Kafka may
> > >
> > > have
> > >
> > > conflicts, but if our goal is to use kafka-universal to support all
> > > Kafka
> > > versions, it is hopeful to target the vast majority of users.
> > >
> > > We don't want to plug all jars into the fat distribution. Only need
> > > less
> > > conflict and common. of course, it is a matter of consideration to
> > >
> > > put
> > >
> > > which jar into fat distribution.
> > > We have the opportunity to facilitate the majority of users, but
> > > also left
> > > opportunities for customization.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Apr 15, 2020 at 10:09 PM Jark Wu  <
> > imj...@gmail.

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-04-16 Thread Jark Wu
Hi Konstantin,

Thanks for bringing this discussion. I think temporal join is a very
important feature and should be exposed to pure SQL users.
And I already received many requirements like this.
However, my concern is that how to properly support this feature in SQL.
Introducing a DDL syntax for Temporal Table Function is one way, but maybe
not the best one.

The most important reason is that the underlying of temporal table function
is exactly a changelog stream.
The temporal join is actually temporal joining a fact stream with the
changelog stream on processing time or event time.
We will soon support to create a changelog source using DDL once FLIP-95
and FLIP-105 is finished.
At that time, we can have a simple DDL to create changelog source like this;

CREATE TABLE rate_changelog (
  currency STRING,
  rate DECIMAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'rate_binlog',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'debezium-json'
);

In the meanwhile, we already have a SQL standard temporal join syntax [1],
i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
It is currently used as dimension table lookup join, but the semantic is
the same to the "temporal table function join"[2].
I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more nature
becuase the definition of B is a *table* not a *table function*,
and the syntax is included in SQL standard.

So once we have the ability to define "rate_changelog" table, then we can
use the following query to temporal join the changelog on processing time.

SELECT *
FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF orders.proctime
ON orders.currency = rate_changelog.currency;

In a nutshell, once FLIP-95 and FLIP-105 is ready, we can easily to support
"temporal join on changelogs" without introducing new syntax.
IMO, introducing a DDL syntax for Temporal Table Function looks like not an
easy way and may have repetitive work.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function





On Thu, 16 Apr 2020 at 23:04, Benchao Li  wrote:

> Hi Konstantin,
>
> Thanks for bringing up this discussion. +1 for the idea.
> We have met this in our company too, and I planned to support it recently
> in our internal branch.
>
> regarding to your questions,
> 1) I think it might be more a table/view than function, just like Temporal
> Table (which is also known as
> dimension table). Maybe we need a DDL like CREATE VIEW and plus some
> additional settings.
> 2) If we design the DDL for it like view, then maybe temporary is ok
> enough.
>
> Konstantin Knauf  于2020年4月16日周四 下午8:16写道:
>
> > Hi everyone,
> >
> > it would be very useful if temporal tables could be created  via DDL.
> > Currently, users either need to do this in the Table API or in the
> > environment file of the Flink CLI, which both require the user to switch
> > the context of the SQL CLI/Editor. I recently created a ticket for this
> > request [1].
> >
> > I see two main questions:
> >
> > 1) What would be the DDL syntax? A Temporal Table is on the one hand a
> view
> > and on the other a function depending on how you look at it.
> >
> > 2) Would this temporal table view/function be stored in the catalog or
> only
> > be temporary?
> >
> > I personally do not have much experience in this area of Flink, so I am
> > looking forward to hearing your thoughts on this.
> >
> > Best,
> >
> > Konstantin
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16824
> >
> > --
> >
> > Konstantin Knauf
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Hequn Chen

2020-04-17 Thread Jark Wu
Congratulations Hequn!

Best,
Jark

On Fri, 17 Apr 2020 at 15:32, Yangze Guo  wrote:

> Congratulations!
>
> Best,
> Yangze Guo
>
> On Fri, Apr 17, 2020 at 3:19 PM Jeff Zhang  wrote:
> >
> > Congratulations, Hequn!
> >
> > Paul Lam  于2020年4月17日周五 下午3:02写道:
> >
> > > Congrats Hequn! Thanks a lot for your contribution to the community!
> > >
> > > Best,
> > > Paul Lam
> > >
> > > Dian Fu  于2020年4月17日周五 下午2:58写道:
> > >
> > > > Congratulations, Hequn!
> > > >
> > > > > 在 2020年4月17日,下午2:36,Becket Qin  写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I am glad to announce that Hequn Chen has joined the Flink PMC.
> > > > >
> > > > > Hequn has contributed to Flink for years. He has worked on several
> > > > > components including Table / SQL,PyFlink and Flink ML Pipeline.
> > > Besides,
> > > > > Hequn is also very active in the community since the beginning.
> > > > >
> > > > > Congratulations, Hequn! Looking forward to your future
> contributions.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > > (On behalf of the Apache Flink PMC)
> > > >
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-04-17 Thread Jark Wu
definition that we need for a
> temporal
> > > > table join.
> > > > * Passing a parameter to a function is a known thing, passing a
> > parameter
> > > > to a VIEW not so much.
> > > > * Users would need to specify the VIEW exactly correct, such that it
> > can
> > > be
> > > > used as a temporal table. Look at 1.1 why this is not trivial.
> > > >
> > > > There is two ways to use a TableFunction:
> > > >
> > > > ### 1.3.1 Built-in and pre-registered function that is parameterized
> in
> > > the
> > > > SQL query
> > > >
> > > > Here, we do not need to do anything to register the function. We
> simply
> > > use
> > > > it in the query (see example in 2.2 below)
> > > >
> > > > ### 1.3.2 Parameterize function when it is registered in the catalog
> > > (with
> > > > a provided Java implementation)
> > > >
> > > > This is the approach, we've used so far. In the Table API, the
> function
> > > is
> > > > first parameterized and created and then registered:
> > > > We would need a DDL syntax to parameterize UDFs on registration.
> > > > I don't want to propose a syntax here, but just to get an idea it
> might
> > > > look like this:
> > > >
> > > > CREATE FUNCTION rates AS
> > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH ('table' =
> > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
> > > >
> > > > Right now, the Flink Catalog interface does not have the
> functionality
> > to
> > > > store such parameters and would need some hacks to properly create
> > > properly
> > > > parameterize function instances.
> > > >
> > > >
> > > >
> > > > # 2 Defining a join of an append-only table and a temporal table
> > > >
> > > > The append-only table needs to have a time-attribute (processing time
> > or
> > > > event time, but same as the temporal table).
> > > > The join then needs to specify two things:
> > > > * an equality predicate that includes the primary key of the temporal
> > > table
> > > > * declare the time attribute of the append-only table as the time as
> of
> > > > which to look up the temporal table, i.e, get the version of the
> > temporal
> > > > table that is valid for the timestamp of the current row from the
> > > > append-only table
> > > >
> > > > The tricky part (from a syntax point of view) is to specify the
> lookup
> > > > time.
> > > >
> > > > ## 2.1 the temporal table is a regular table or view (see approaches
> > 1.1
> > > > and 1.2 above)
> > > >
> > > > In this case we can use the "FOR SYSTEM_TIME AS OF x" clause as
> > follows:
> > > >
> > > > SELECT *
> > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> > > > WHERE o.currency = r.currency
> > > >
> > > > IMO, this is a great syntax and the one we should strive for.
> > > > We would need to bend the rules of the SQL standard which only
> allows x
> > > in
> > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on which it
> is
> > > > applied usually needs to be a specific type (not sure if views are
> > > > supported), but I guess this is fine.
> > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for
> > LookupTable
> > > > Joins if x is a processing time attribute [2].
> > > >
> > > > ## 2.2 the temporal table is a TableFunction and parameterized in the
> > > query
> > > > (see 1.3.1 above)
> > > >
> > > > SELECT *
> > > > FROM orders o,
> > > >TEMPORAL_TABLE(
> > > >  table => TABLE(rates_history),
> > > >  key => DESCRIPTOR(currency),
> > > >  time => DESCRIPTOR(rowtime)) r
> > > >ON o.currency = r.currency
> > > >
> > > > The function "TEMPORAL_TABLE" is built-in and nothing was registered
> in
> > > the
> > > > catalog (except the rates_history table).
> > > > In fact this is valid SQL:2016 syntax and called Polymorphic Table
> >

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-04-18 Thread Jark Wu
Hi Fabian,

Just to clarify a little bit, we decided to move the "converting
append-only table into changelog table" into future work.
So FLIP-105 only introduced some CDC formats (debezium) and new TableSource
interfaces proposed in FLIP-95.
I should have started a new FLIP for the new CDC formats and keep FLIP-105
as it is to avoid the confusion, sorry about that.

Best,
Jark


On Sat, 18 Apr 2020 at 00:35, Fabian Hueske  wrote:

> Thanks Jark!
>
> I certainly need to read up on FLIP-105 (and I'll try to adjust my
> terminology to changelog table from now on ;-) )
> If FLIP-105 addresses the issue of converting an append-only table into a
> changelog table that upserts on primary key (basically what the VIEW
> definition in my first email did),
> TEMPORAL VIEWs become much less important.
> In that case, we would be well served with TEMPORAL TABLE and TEMPORAL VIEW
> would be a nice-to-have feature for some later time.
>
> Cheers, Fabian
>
>
>
>
>
>
> Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu :
>
> > Hi Fabian,
> >
> > I think converting an append-only table into temporal table contains two
> > things:
> > (1) converting append-only table into changelog table (or retraction
> table
> > as you said)
> > (2) define the converted changelog table (maybe is a view now) as
> temporal
> > (or history tracked).
> >
> > The first thing is also mentioned and discussed in FLIP-105 design draft
> > [1] which proposed a syntax
> > to convert the append-only table into a changelog table.
> >
> > I think TEMPORAL TABLE is quite straightforward and simple, and can
> satisfy
> > most existing changelog
> > data with popular CDC formats. TEMPORAL VIEW is flexible but will involve
> > more SQL codes. I think
> > we can support them both.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
> >
> > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske  wrote:
> >
> > > Hi,
> > >
> > > I agree with most of what Timo said.
> > >
> > > The TEMPORAL keyword (which unfortunately might be easily confused with
> > > TEMPORARY...) looks very intuitive and I think using the only time
> > > attribute for versioning would be a good choice.
> > >
> > > However, TEMPORAL TABLE on retraction tables do not solve the full
> > problem.
> > > I believe there will be also cases where we need to derive a temporal
> > table
> > > from an append only table (what TemporalTableFunctions do right now).
> > > I think the best choice for this would be TEMPORAL VIEW but as I
> > explained,
> > > it might be a longer way until this can be supported.
> > > TEMPORAL VIEW would also address the problem of preprocessing.
> > >
> > > > Regarding retraction table with a primary key and a time-attribute:
> > > > These semantics are still unclear to me. Can retractions only occur
> > > > within watermarks? Or are they also used for representing late
> updates?
> > >
> > > Time attributes and retraction streams are a challenging topic that I
> > > haven't completely understood yet.
> > > So far we treated time attributes always as part of the data.
> > > In combination with retractions, it seems that they become metadata
> that
> > > specifies when a change was done.
> > > I think this is different from treating time attributes as regular
> data.
> > >
> > > Cheers, Fabian
> > >
> > >
> > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman <
> > > sjwies...@gmail.com
> > > >:
> > >
> > > > I really like the TEMPORAL keyword, I find it very intuitive.
> > > >
> > > > The down side of this approach would be that an additional
> > preprocessing
> > > > > step would not be possible anymore because there is no preceding
> > view.
> > > > >
> > > >
> > > >  Yes and no. My understanding is we are not talking about making any
> > > > changes to how temporal tables are defined in the table api. Since
> you
> > > > cannot currently define temporal table functions in pure SQL
> > > applications,
> > > > but only pre-register them in YAML, you can't do any pre-processing
> as
> > it
> > > > stands today. Preprocessing may be a generally useful feature, I'm
> not
> > > > sure, but this 

Re: [DISCUSS] Exact feature freeze date

2020-04-23 Thread Jark Wu
+1

Thanks,
Jark

On Thu, 23 Apr 2020 at 22:36, Xintong Song  wrote:

> +1
> From our side we can also benefit from the extending of feature freeze, for
> pluggable slot allocation, GPU support and perjob mode on Kubernetes
> deployment.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Apr 23, 2020 at 10:31 PM Timo Walther  wrote:
>
> >  From the SQL side, I'm sure that FLIP-95 and FLIP-105 could benefit
> > from extending the feature freeze.
> >
> > Thanks,
> > Timo
> >
> > On 23.04.20 16:11, Aljoscha Krettek wrote:
> > > +1
> > >
> > > Aljoscha
> > >
> > > On 23.04.20 15:23, Till Rohrmann wrote:
> > >> +1 for extending the feature freeze until May 15th.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Apr 23, 2020 at 1:00 PM Piotr Nowojski 
> > >> wrote:
> > >>
> > >>> Hi Stephan,
> > >>>
> > >>> As release manager I’ve seen that quite a bit of features could use
> > >>> of the
> > >>> extra couple of weeks. This also includes some features that I’m
> > >>> involved
> > >>> with, like FLIP-76, or limiting the in-flight buffers.
> > >>>
> > >>> +1 From my side for extending the feature freeze until May 15th.
> > >>>
> > >>> Piotrek
> > >>>
> >  On 23 Apr 2020, at 10:10, Stephan Ewen  wrote:
> > 
> >  Hi all!
> > 
> >  I want to bring up a discussion about when we want to do the feature
> > >>> freeze
> >  for 1.11.
> > 
> >  When kicking off the release cycle, we tentatively set the date to
> >  end of
> >  April, which would be in one week.
> > 
> >  I can say from the features I am involved with (FLIP-27, FLIP-115,
> >  reviewing some state backend improvements, etc.) that it would be
> >  helpful
> >  to have two additional weeks.
> > 
> >  When looking at various other feature threads, my feeling is that
> > there
> > >>> are
> >  more contributors and committers that could use a few more days.
> >  The last two months were quite exceptional in and we did lose a bit
> of
> >  development speed here and there.
> > 
> >  How do you think about making *May 15th* the feature freeze?
> > 
> >  Best,
> >  Stephan
> > >>>
> > >>>
> > >>
> >
> >
>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-26 Thread Jark Wu
Thanks Dian for being the release manager and thanks all who make this
possible.

Best,
Jark

On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:

> Thanks Dian for the release and being the release manager !
>
> Best,
> Leonard Xu
>
>
> 在 2020年4月26日,17:58,Benchao Li  写道:
>
> Thanks Dian for the effort, and all who make this release possible. Great
> work!
>
> Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>
>> Thanks for managing this release!
>>
>> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun 
>> wrote:
>>
>>> Thanks for your great job, Dian!
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>>>
 @Dian, thanks a lot for the release and for being the release manager.
 Also thanks to everyone who made this release possible!

 Best,
 Hequn

 On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:

> Hi everyone,
>
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.9.3, which is the third bugfix release for the Apache Flink
> 1.9 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data 
> streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> Also great thanks to @Jincheng for helping finalize this release.
>
> Regards,
> Dian
>

>>
>> --
>> Konstantin Knauf | Head of Product
>> +49 160 91394525
>>
>> Follow us @VervericaData Ververica 
>>
>> --
>> Join Flink Forward  - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>


Re: [DISCUSS] Should max/min be part of the hierarchy of config option?

2020-04-27 Thread Jark Wu
+1 for xyz.[min|max]

This is already mentioned in the Code Style Guideline [1].

Best,
Jark


[1]:
https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes

On Mon, 27 Apr 2020 at 21:33, Flavio Pompermaier 
wrote:

> +1 for Chesnay approach
>
> On Mon, Apr 27, 2020 at 2:31 PM Chesnay Schepler 
> wrote:
>
> > +1 for xyz.[min|max]; imo it becomes obvious if think of it like a yaml
> > file:
> >
> > xyz:
> >  min:
> >  max:
> >
> > opposed to
> >
> > min-xyz:
> > max-xyz:
> >
> > IIRC this would also be more in-line with the hierarchical scheme for
> > config options we decided on months ago.
> >
> > On 27/04/2020 13:25, Xintong Song wrote:
> > > +1 for Robert's idea about adding tests/tools checking the pattern of
> new
> > > configuration options, and migrate the old ones in release 2.0.
> > >
> > > Concerning the preferred pattern, I personally agree with Till's
> > opinion. I
> > > think 'xyz.[min|max]' somehow expresses that 'min' and 'max' are
> > properties
> > > of 'xyz', while 'xyz' may also have other properties. An example could
> be
> > > 'taskmanager.memory.network.[min|max|fraction]'.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Apr 27, 2020 at 6:00 PM Till Rohrmann 
> > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> as Robert said I think the problem is that we don't have strict
> > guidelines
> > >> and every committer follows his/her personal taste. I'm actually not
> > sure
> > >> whether we can define bullet-proof guidelines but we can definitely
> > >> make them more concrete.
> > >>
> > >> In this case here, I have to admit that I have an opposing
> > opinion/taste. I
> > >> think it would be better to use xyz.min and xyz.max. The reason is
> that
> > we
> > >> configure a property of xyz which consists of the minimum and maximum
> > >> value. Differently said, min and max belong semantically together and
> > hence
> > >> should be defined together. You can think of it as if the type of the
> > xyz
> > >> config option would be a tuple of two integers instead of two
> individual
> > >> integers.
> > >>
> > >> A comment concerning the existing styles of config options: I think
> > many of
> > >> the config options which follow the max-xzy pattern are actually older
> > >> configuration options.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Mon, Apr 27, 2020 at 10:34 AM Robert Metzger 
> > >> wrote:
> > >>
> > >>> Thanks for starting this discussion.
> > >>> I believe the different options are a lot about personal taste, there
> > are
> > >>> no objective arguments why one option is better than the other.
> > >>>
> > >>> I agree with your proposal to simply go with the "max-xyz" pattern,
> as
> > >> this
> > >>> is the style of the majority of the current configuration options in
> > >> Flink
> > >>> (maybe this also means it is the taste of the majority of Flink
> > >>> developers?).
> > >>>
> > >>> I would propose to add a test or some tooling that checks that all
> new
> > >>> configuration parameters follow this pattern, as well as tickets for
> > >> Flink
> > >>> 2.0 to migrate the "wrong" configuration options.
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Apr 22, 2020 at 5:47 AM Yangze Guo 
> wrote:
> > >>>
> >  Hi, everyone,
> > 
> >  I'm working on FLINK-16605 Add max limitation to the total number of
> >  slots[1]. In the PR, I, Gary and Xintong has a discussion[2] about
> the
> >  config option of this limit.
> >  The central question is whether the "max" should be part of the
> >  hierarchy or part of the property itself.
> > 
> >  It means there could be two patterns:
> >  - max-xyz
> >  - xyz.max
> > 
> >  Currently, there is no clear consensus on which style is better and
> we
> >  could find both patterns in the current Flink. Here, I'd like to
> first
> >  sort out[3]:
> > 
> >  Config options follow the "max-xyz" pattern:
> >  - restart-strategy.failure-rate.max-failures-per-interval
> >  - yarn.maximum-failed-containers
> >  - state.backend.rocksdb.compaction.level.max-size-level-base
> >  - cluster.registration.max-timeout
> >  - high-availability.zookeeper.client.max-retry-attempts
> >  - rest.client.max-content-length
> >  - rest.retry.max-attempts
> >  - rest.server.max-content-length
> >  - jobstore.max-capacity
> >  - taskmanager.registration.max-backoff
> >  - compiler.delimited-informat.max-line-samples
> >  - compiler.delimited-informat.min-line-samples
> >  - compiler.delimited-informat.max-sample-len
> >  - taskmanager.runtime.max-fan
> >  - pipeline.max-parallelism
> >  - execution.checkpointing.max-concurrent-checkpoint
> >  - execution.checkpointing.min-pause
> > 
> >  Config options follow the "xyz.max" pattern:
> >  - taskmanager.memory.jvm-overhead.max
> >  - taskmanager.memory.jvm-overhead.min
> >  - taskmanager.memor

Re: Question about FLIP-66

2020-04-27 Thread Jark Wu
Hi Jungtaek,

Kurt has said what I want to say. I will add some background.
Flink Table API & SQL only supports to define processing-time attribute and
event-time attribute (watermark) on source, not support to define a new one
in query.
The time attributes will pass through the query and time-based operations
can only apply on the time attributes.

The reason why Flink Table & SQL only supports to define watermark on
source is that this can allow us to do per-partition watermark, source idle
and simplify things.
There are also some discussion about "disable arbitrary watermark assigners
in the middle of a pipeline in DataStream" in this JIRA issue comments.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-11286


On Tue, 28 Apr 2020 at 09:28, Kurt Young  wrote:

> The current behavior is later. Flink gets time attribute column from source
> table, and tries to analyze and keep
> the time attribute column as much as possible, e.g. simple projection or
> filter which doesn't effect the column
> will keep the time attribute, window aggregate will generate its own time
> attribute if you select window_start or
> window_end. But you're right, sometimes framework will loose the
> information about time attribute column, and
> after that, some operations will throw exception.
>
> Best,
> Kurt
>
>
> On Tue, Apr 28, 2020 at 7:45 AM Jungtaek Lim  >
> wrote:
>
> > Hi devs,
> >
> > I'm interesting about the new change on FLIP-66 [1], because if I
> > understand correctly, Flink hasn't been having event-time timestamp field
> > (column) as a part of "normal" schema, and FLIP-66 tries to change it.
> >
> > That sounds as the column may be open for modification, like rename
> (alias)
> > or some other operations, or even be dropped via projection. Will such
> > operations affect event-time timestamp for the record? If you have an
> idea
> > about how Spark Structured Streaming works with watermark then you might
> > catch the point.
> >
> > Maybe the question could be reworded as, does the definition of event
> time
> > timestamp column on DDL only project to the source definition, or it will
> > carry over the entire query and let operator determine such column as
> > event-time timestamp. (SSS works as latter.) I think this is a huge
> > difference, as for me it's like stability vs flexibility, and there're
> > drawbacks on latter (there're also drawbacks on former as well, but
> > computed column may cover up).
> >
> > Thanks in advance!
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1.
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
> >
>


Re: Question about FLIP-66

2020-04-27 Thread Jark Wu
Hi Jungtaek,

Yes. Your understanding is correct :)

Best,
Jark

On Tue, 28 Apr 2020 at 11:58, Jungtaek Lim 
wrote:

> Thanks Kurt and Jark for the detailed explanation! Pretty much helped to
> understand about FLIP-66.
>
> That sounds as Flink won't leverage timestamp in StreamRecord (which is
> hidden and cannot modified easily) and handles the time semantic by the
> input schema for the operation, to unify the semantic between batch and
> stream. Did I understand it correctly?
>
> I'm not familiar with internal of Flink so not easy to consume the
> information in FLINK-11286, but in general I'd be supportive with defining
> watermark as close as possible from source, as it'll be easier to reason
> about. (I basically refer to timestamp assigner instead of watermark
> assigner though.)
>
> - Jungtaek Lim
>
> On Tue, Apr 28, 2020 at 11:37 AM Jark Wu  wrote:
>
> > Hi Jungtaek,
> >
> > Kurt has said what I want to say. I will add some background.
> > Flink Table API & SQL only supports to define processing-time attribute
> and
> > event-time attribute (watermark) on source, not support to define a new
> one
> > in query.
> > The time attributes will pass through the query and time-based operations
> > can only apply on the time attributes.
> >
> > The reason why Flink Table & SQL only supports to define watermark on
> > source is that this can allow us to do per-partition watermark, source
> idle
> > and simplify things.
> > There are also some discussion about "disable arbitrary watermark
> assigners
> > in the middle of a pipeline in DataStream" in this JIRA issue comments.
> >
> > Best,
> > Jark
> >
> > [1]: https://issues.apache.org/jira/browse/FLINK-11286
> >
> >
> > On Tue, 28 Apr 2020 at 09:28, Kurt Young  wrote:
> >
> > > The current behavior is later. Flink gets time attribute column from
> > source
> > > table, and tries to analyze and keep
> > > the time attribute column as much as possible, e.g. simple projection
> or
> > > filter which doesn't effect the column
> > > will keep the time attribute, window aggregate will generate its own
> time
> > > attribute if you select window_start or
> > > window_end. But you're right, sometimes framework will loose the
> > > information about time attribute column, and
> > > after that, some operations will throw exception.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Tue, Apr 28, 2020 at 7:45 AM Jungtaek Lim <
> > kabhwan.opensou...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'm interesting about the new change on FLIP-66 [1], because if I
> > > > understand correctly, Flink hasn't been having event-time timestamp
> > field
> > > > (column) as a part of "normal" schema, and FLIP-66 tries to change
> it.
> > > >
> > > > That sounds as the column may be open for modification, like rename
> > > (alias)
> > > > or some other operations, or even be dropped via projection. Will
> such
> > > > operations affect event-time timestamp for the record? If you have an
> > > idea
> > > > about how Spark Structured Streaming works with watermark then you
> > might
> > > > catch the point.
> > > >
> > > > Maybe the question could be reworded as, does the definition of event
> > > time
> > > > timestamp column on DDL only project to the source definition, or it
> > will
> > > > carry over the entire query and let operator determine such column as
> > > > event-time timestamp. (SSS works as latter.) I think this is a huge
> > > > difference, as for me it's like stability vs flexibility, and
> there're
> > > > drawbacks on latter (there're also drawbacks on former as well, but
> > > > computed column may cover up).
> > > >
> > > > Thanks in advance!
> > > > Jungtaek Lim (HeartSaVioR)
> > > >
> > > > 1.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
> > > >
> > >
> >
>


Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

2020-04-28 Thread Jark Wu
Hi lsyldliu,

Thanks for investigating this.

First of all, if you are using mini-batch deduplication, it doesn't support
state ttl in 1.9. That's why the tps looks the same with 1.11 disable state
ttl.
We just introduce state ttl for mini-batch deduplication recently.

Regarding to the performance regression, it looks very surprise to me. The
performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
I don't have much experience of the underlying of StateTtlConfig. So I loop
in @Yu Li  @YunTang in CC who may have more insights on
this.

For more information, we use the following StateTtlConfig [1] in blink
planner:

StateTtlConfig
  .newBuilder(Time.milliseconds(retentionTime))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build();


Best,
Jark


[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27





On Wed, 29 Apr 2020 at 11:53, 刘大龙  wrote:

> Hi, all!
>
> At flink master branch, we have supported state ttl  for sql mini-batch
> deduplication using incremental cleanup strategy on heap backend, refer to
> FLINK-16581. Because I want to test the performance of this feature, so I
> compile master branch code and deploy the jar to production
> environment,then run three types of tests, respectively:
>
>
>
>
> flink 1.9.0 release version enable state ttl
> flink 1.11-snapshot version disable state ttl
> flink 1.11-snapshot version enable state ttl
>
>
>
>
> The test query sql as follows:
>
> select order_date,
> sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> goods_carriage_amt) as saleP,
> sum(amount) as saleN,
> count(distinct parent_sn) as orderN,
> count(distinct user_id) as cusN
>from(
> select order_date, user_id,
> order_type, order_status, terminal, last_update_time,
> goods_all_fav_amt,
> goods_carriage_amt, virtual_money_amt, price, amount,
> order_quality, quality_goods_cnt, acture_goods_amt
> from (select *, row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum from dm_trd_order_goods)
> where rownum=1
> and (order_type in (1,2,3,4,5) or order_status = 70)
> and terminal = 'shop' and price > 0)
> group by order_date
>
>
> At runtime, this query will generate two operators which include
> Deduplication and GroupAgg. In the test, the configuration is same,
> parallelism is 20, set kafka consumer from the earliest, and disable
> mini-batch function, The test results as follows:
>
> flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w
> records, average tps at 5200/s, Flink UI picture link back pressure,
> checkpoint
> flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink
> receive 883w records, average tps at 5200/s, Flink UI picture link back
> pressure, checkpoint
> flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> flink only receive 168w records because of deduplication operator serious
> back pressure, average tps at 270/s, moreover, checkpoint always fail
> because of deduplication operator serious back pressure, Flink UI picture
> link back pressure, checkpoint
>
> Deduplication state clean up implement in flink 1.9.0 use timer, but
> 1.11-snapshot version use StateTtlConfig, this is the main difference.
> Comparing the three tests comprehensively, we can see that if disable state
> ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state
> ttl. However, if enable state ttl in 1.11-snapshot, performance down is
> nearly 20 times, so I think incremental cleanup strategy cause this
> problem, what do you think about it? @azagrebin, @jark.
>
> Thanks.
>
> lsyldliu
>
> Zhejiang University, College of Control Science and engineer, CSC


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jark Wu
>From a user's perspective, I prefer the shorter one "format=json", because
it's more concise and straightforward. The "kind" is redundant for users.
Is there a real case requires to represent the configuration in JSON style?
As far as I can see, I don't see such requirement, and everything works
fine by now.

So I'm in favor of "format=json". But if the community insist to follow
code style on this, I'm also fine with the longer one.

Btw, I also CC user mailing list to listen more user's feedback. Because I
think this is relative to usability.

Best,
Jark

On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  wrote:

>  > Therefore, should we advocate instead:
>  >
>  > 'format.kind' = 'json',
>  > 'format.fail-on-missing-field' = 'false'
>
> Yes. That's pretty much it.
>
> This is reasonable important to nail down as with such violations I
> believe we could not actually switch to a standard YAML parser.
>
> On 29/04/2020 16:05, Timo Walther wrote:
> > Hi everyone,
> >
> > discussions around ConfigOption seem to be very popular recently. So I
> > would also like to get some opinions on a different topic.
> >
> > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > agreed on the following DDL syntax:
> >
> > CREATE TABLE fs_table (
> >  ...
> > ) WITH (
> >  'connector' = 'filesystem',
> >  'path' = 'file:///path/to/whatever',
> >  'format' = 'csv',
> >  'format.allow-comments' = 'true',
> >  'format.ignore-parse-errors' = 'true'
> > );
> >
> > Of course this is slightly different from regular Flink core
> > configuration but a connector still needs to be configured based on
> > these options.
> >
> > However, I think this FLIP violates our code style guidelines because
> >
> > 'format' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > is an invalid hierarchy. `format` cannot be a string and a top-level
> > object at the same time.
> >
> > We have similar problems in our runtime configuration:
> >
> > state.backend=
> > state.backend.incremental=
> > restart-strategy=
> > restart-strategy.fixed-delay.delay=
> > high-availability=
> > high-availability.cluster-id=
> >
> > The code style guide states "Think of the configuration as nested
> > objects (JSON style)". So such hierarchies cannot be represented in a
> > nested JSON style.
> >
> > Therefore, should we advocate instead:
> >
> > 'format.kind' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > What do you think?
> >
> > Thanks,
> > Timo
> >
> > [1]
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >
>
>


Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Jark Wu
Looks like the ES NOTICE problem is a long-standing problem, because the
ES6 sql connector NOTICE also misses these dependencies.

Best,
Jark

On Wed, 29 Apr 2020 at 17:26, Robert Metzger  wrote:

> Thanks for taking a look Chesnay. Then let me officially cancel the
> release:
>
> -1 (binding)
>
>
> Another question that I had while checking the release was the
> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
> distribution.
> It does not contain a LICENSE and NOTICE file at the root level (which is
> okay [1] for binary releases), but in the "pyflink/" directory. There is
> also a "deps/" directory, which contains a full distribution of Flink,
> without any license files.
> I believe it would be a little bit nicer to have the LICENSE and NOTICE
> file in the root directory (if the python wheels format permits) to make
> sure it is obvious that all binary release contents are covered by these
> files.
>
>
> [1]
> http://www.apache.org/legal/release-policy.html#licensing-documentation
>
>
>
>
> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
> wrote:
>
> > Thanks a lot for creating a release candidate for 1.10.1!
> >
> > +1 from my side
> >
> > checked
> > - md5/gpg, ok
> > - source does not contain any binaries, ok
> > - pom points to the same version 1.10.1, ok
> > - README file does not contain anything unexpected, ok
> > - maven clean package -DskipTests, ok
> > - maven clean verify, encounter a test timeout exception, but I think it
> > does not block the RC(have created an issue[1] to track it),
> > - run demos on a stand-alone cluster, ok
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-17458
> > Best,
> > Congxian
> >
> >
> > Robert Metzger  于2020年4月29日周三 下午2:54写道:
> >
> > > Thanks a lot for creating a release candidate for 1.10.1!
> > >
> > > I'm not sure, but I think found a potential issue in the release while
> > > checking dependency changes on the ElasticSearch7 connector:
> > >
> > >
> >
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > >
> > > In this change, "com.carrotsearch:hppc" has been added to the shaded
> jar
> > (
> > >
> > >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> > > ),
> > > without including proper mention of that dependency in
> "META-INF/NOTICE".
> > >
> > >
> > > My checking notes:
> > >
> > > - checked the diff for dependency changes:
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> > > (w/o
> > > <
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> > >
> > > release commit:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> > > )
> > >   - flink-connector-hive sets the derby version for test-scoped
> > > dependencies:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> > >  - no NOTICE file found, but this module does not forward binaries.
> > >   - kafka 0.10 minor version upgrade:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
> > >   - NOTICE change found
> > >- ES7 changes shading:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > >  - problem found
> > >   - Influxdb version change
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
> > >  - NOTICE change found
> > >
> > >
> > >
> > > On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for version
> 1.10.1,
> > as
> > > > follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > >
> > > > The complete staging area is available for your review, which
> includes:
> > > > * JIRA release notes [1],
> > > > * the official Apache source release and binary convenience releases
> to
> > > be
> > > > deployed to dist.apache.org [2], which are signed with the key with
> > > > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > * source code tag "release-1.10.1-rc1" [5],
> > > > * website pull request listing the new release and adding
> announcement
> > > blog
> > > > post [6].
> > > >
> > > > The vote will be open for at least 72 hours. It is adopted by
> majority
> > > > approval, with at least 3 PMC affirmative votes.
> > > >
> > > > Thanks,
> > > > Yu
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Jark Wu
Hi Chesnay,

I mean `flink-sql-connector-elasticsearch6`.
Because this dependency change on elasticserch7 [1] is totally following
how elasticsearch6 does. And they have the almost same dependencies.

Best,
Jark

[1]:
https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38

On Thu, 30 Apr 2020 at 14:44, Chesnay Schepler  wrote:

> ES6 isn't bundling these dependencies.
>
> On 29/04/2020 17:29, Jark Wu wrote:
> > Looks like the ES NOTICE problem is a long-standing problem, because the
> > ES6 sql connector NOTICE also misses these dependencies.
> >
> > Best,
> > Jark
> >
> > On Wed, 29 Apr 2020 at 17:26, Robert Metzger 
> wrote:
> >
> >> Thanks for taking a look Chesnay. Then let me officially cancel the
> >> release:
> >>
> >> -1 (binding)
> >>
> >>
> >> Another question that I had while checking the release was the
> >> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
> >> distribution.
> >> It does not contain a LICENSE and NOTICE file at the root level (which
> is
> >> okay [1] for binary releases), but in the "pyflink/" directory. There is
> >> also a "deps/" directory, which contains a full distribution of Flink,
> >> without any license files.
> >> I believe it would be a little bit nicer to have the LICENSE and NOTICE
> >> file in the root directory (if the python wheels format permits) to make
> >> sure it is obvious that all binary release contents are covered by these
> >> files.
> >>
> >>
> >> [1]
> >> http://www.apache.org/legal/release-policy.html#licensing-documentation
> >>
> >>
> >>
> >>
> >> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
> >> wrote:
> >>
> >>> Thanks a lot for creating a release candidate for 1.10.1!
> >>>
> >>> +1 from my side
> >>>
> >>> checked
> >>> - md5/gpg, ok
> >>> - source does not contain any binaries, ok
> >>> - pom points to the same version 1.10.1, ok
> >>> - README file does not contain anything unexpected, ok
> >>> - maven clean package -DskipTests, ok
> >>> - maven clean verify, encounter a test timeout exception, but I think
> it
> >>> does not block the RC(have created an issue[1] to track it),
> >>> - run demos on a stand-alone cluster, ok
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-17458
> >>> Best,
> >>> Congxian
> >>>
> >>>
> >>> Robert Metzger  于2020年4月29日周三 下午2:54写道:
> >>>
> >>>> Thanks a lot for creating a release candidate for 1.10.1!
> >>>>
> >>>> I'm not sure, but I think found a potential issue in the release while
> >>>> checking dependency changes on the ElasticSearch7 connector:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> >>>> In this change, "com.carrotsearch:hppc" has been added to the shaded
> >> jar
> >>> (
> >>>>
> >>
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> >>>> ),
> >>>> without including proper mention of that dependency in
> >> "META-INF/NOTICE".
> >>>>
> >>>> My checking notes:
> >>>>
> >>>> - checked the diff for dependency changes:
> >>>>
> >>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> >>>> (w/o
> >>>> <
> >>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> >>>> release commit:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> >>>> )
> >>>>- flink-connector-hive sets the derby version for test-scoped
> >>>> dependencies:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> >>>>   - no NOTICE file found, but this module does not forward
> binaries.
> &

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-30 Thread Jark Wu
Thanks for the tip! I checked it and you are right :)

On Thu, 30 Apr 2020 at 15:08, Chesnay Schepler  wrote:

> flink-sql-connector-elasticsearch6 isn't bundling com.carrotsearch:hppc,
> nor does it have dependencies on org.elasticsearch:elasticsearch-geo,
> org.elasticsearch.plugin:lang-mustache-client nor
> com.github.spullara.mustache.java:compiler (and thus is also not bundling
> them).
>
> You can check this yourself by packaging the connector and comparing the
> shade-plugin output with the NOTICE file.
>
> On 30/04/2020 08:55, Jark Wu wrote:
>
> Hi Chesnay,
>
> I mean `flink-sql-connector-elasticsearch6`.
> Because this dependency change on elasticserch7 [1] is totally following
> how elasticsearch6 does. And they have the almost same dependencies.
>
> Best,
> Jark
>
> [1]:
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
>
> On Thu, 30 Apr 2020 at 14:44, Chesnay Schepler  wrote:
>
>> ES6 isn't bundling these dependencies.
>>
>> On 29/04/2020 17:29, Jark Wu wrote:
>> > Looks like the ES NOTICE problem is a long-standing problem, because the
>> > ES6 sql connector NOTICE also misses these dependencies.
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 29 Apr 2020 at 17:26, Robert Metzger 
>> wrote:
>> >
>> >> Thanks for taking a look Chesnay. Then let me officially cancel the
>> >> release:
>> >>
>> >> -1 (binding)
>> >>
>> >>
>> >> Another question that I had while checking the release was the
>> >> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
>> >> distribution.
>> >> It does not contain a LICENSE and NOTICE file at the root level (which
>> is
>> >> okay [1] for binary releases), but in the "pyflink/" directory. There
>> is
>> >> also a "deps/" directory, which contains a full distribution of Flink,
>> >> without any license files.
>> >> I believe it would be a little bit nicer to have the LICENSE and NOTICE
>> >> file in the root directory (if the python wheels format permits) to
>> make
>> >> sure it is obvious that all binary release contents are covered by
>> these
>> >> files.
>> >>
>> >>
>> >> [1]
>> >>
>> http://www.apache.org/legal/release-policy.html#licensing-documentation
>> >>
>> >>
>> >>
>> >>
>> >> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
>> >> wrote:
>> >>
>> >>> Thanks a lot for creating a release candidate for 1.10.1!
>> >>>
>> >>> +1 from my side
>> >>>
>> >>> checked
>> >>> - md5/gpg, ok
>> >>> - source does not contain any binaries, ok
>> >>> - pom points to the same version 1.10.1, ok
>> >>> - README file does not contain anything unexpected, ok
>> >>> - maven clean package -DskipTests, ok
>> >>> - maven clean verify, encounter a test timeout exception, but I think
>> it
>> >>> does not block the RC(have created an issue[1] to track it),
>> >>> - run demos on a stand-alone cluster, ok
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-17458
>> >>> Best,
>> >>> Congxian
>> >>>
>> >>>
>> >>> Robert Metzger  于2020年4月29日周三 下午2:54写道:
>> >>>
>> >>>> Thanks a lot for creating a release candidate for 1.10.1!
>> >>>>
>> >>>> I'm not sure, but I think found a potential issue in the release
>> while
>> >>>> checking dependency changes on the ElasticSearch7 connector:
>> >>>>
>> >>>>
>> >>
>> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
>> >>>> In this change, "com.carrotsearch:hppc" has been added to the shaded
>> >> jar
>> >>> (
>> >>>>
>> >>
>> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
>> >>>> ),
>> >>>> without including proper mention of that dependency in
>> >> "META-INF/NOTICE".
>> >>

Re: chinese-translation for FLINK-16091

2020-04-30 Thread Jark Wu
Hi,

Welcome to the community!
There is no contributor permission now, you can just comment under the JIRA
issue.
And committer will assign issue to you if no one is working on this.

Best,
Jark


On Thu, 30 Apr 2020 at 17:36, flinker  wrote:

> Hi,
>
> I want to contribute to Apache Flink.
> Would you please give me the contributor permission?
> My JIRA ID is FLINK-16091 ;
> https://issues.apache.org/jira/browse/FLINK-16091.Thank you.


Re: [DISCUSS] Introduce TableFactory for StatefulSequenceSource

2020-04-30 Thread Jark Wu
Hi Konstantin,

Thanks for the link of Java Faker. It's an intereting project and
could benefit to a comprehensive datagen source.

What the discarding and printing sink look like in your thought?
1) manually create a table with a `blackhole` or `print` connector, e.g.

CREATE TABLE my_sink (
  a INT,
  b STRNG,
  c DOUBLE
) WITH (
  'connector' = 'print'
);
INSERT INTO my_sink SELECT a, b, c FROM my_source;

2) a system built-in table named `blackhole` and `print` without manually
schema work, e.g.
INSERT INTO print SELECT a, b, c, d FROM my_source;

Best,
Jark



On Thu, 30 Apr 2020 at 21:19, Konstantin Knauf  wrote:

> Hi everyone,
>
> sorry for reviving this thread at this point in time. Generally, I think,
> this is a very valuable effort. Have we considered only providing a very
> basic data generator (+ discarding and printing sink tables) in Apache
> Flink and moving a more comprehensive data generating table source to an
> ecosystem project promoted on flink-packages.org. I think this has a lot
> of
> potential (e.g. in combination with Java Faker [1]), but it would probably
> be better served in a small separately maintained repository.
>
> Cheers,
>
> Konstantin
>
> [1] https://github.com/DiUS/java-faker
>
>
> On Tue, Mar 24, 2020 at 9:10 AM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > I created https://issues.apache.org/jira/browse/FLINK-16743 for
> follow-up
> > discussion. FYI.
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Mar 24, 2020 at 2:20 PM Bowen Li  wrote:
> >
> > > I agree with Jingsong that sink schema inference and system tables can
> be
> > > considered later. I wouldn’t recommend to tackle them for the sake of
> > > simplifying user experience to the extreme. Providing the above handy
> > > source and sink implementations already offer users a ton of immediate
> > > value.
> > >
> > >
> > > On Mon, Mar 23, 2020 at 20:20 Jingsong Li 
> > wrote:
> > >
> > > > Hi Benchao,
> > > >
> > > > > do you think we need to add more columns with various types?
> > > >
> > > > I didn't list all types, but we should support primitive types,
> > varchar,
> > > > Decimal, Timestamp and etc...
> > > > This can be done continuously.
> > > >
> > > > Hi Benchao, Jark,
> > > > About console and blackhole, yes, they can have no schema, the schema
> > can
> > > > be inferred by upstream node.
> > > > - But now we don't have this mechanism to do these configurable sink
> > > > things.
> > > > - If we want to support, we need a single way to support these two
> > sinks.
> > > > - And uses can use "create table like" and others way to simplify
> DDL.
> > > >
> > > > And for providing system/registered tables (`console` and
> `blackhole`):
> > > > - I have no strong opinion on these system tables. In SQL, will be
> > > "insert
> > > > into blackhole select a /*int*/, b /*string*/ from tableA", "insert
> > into
> > > > blackhole select a /*double*/, b /*Map*/, c /*string*/ from tableB".
> It
> > > > seems that Blackhole is a universal thing, which makes me feel bad
> > > > intuitively.
> > > > - Can user override these tables? If can, we need ensure it can be
> > > > overwrite by catalog tables.
> > > >
> > > > So I think we can leave these system tables to future too.
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Mon, Mar 23, 2020 at 4:44 PM Jark Wu  wrote:
> > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > Regarding (2) and (3), I was thinking to ignore manually DDL work,
> so
> > > > users
> > > > > can use them directly:
> > > > >
> > > > > # this will log results to `.out` files
> > > > > INSERT INTO console
> > > > > SELECT ...
> > > > >
> > > > > # this will drop all received records
> > > > > INSERT INTO blackhole
> > > > > SELECT ...
> > > > >
> > > > > Here `console` and `blackhole` are system sinks which is similar to
> > > > system
> > > > > functions.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> 

Re: [DISCUSS]Refactor flink-jdbc connector structure

2020-04-30 Thread Jark Wu
Big +1 from my side.
The new structure and class names look nicer now.

Regarding to the compability problem, I have looked into the public APIs in
flink-jdbc, there are 3 kinds of APIs now:
1) new introduced JdbcSink for DataStream users in 1.11
2) JDBCAppendTableSink, JDBCUpsertTableSink, JDBCTableSource are introduced
since 1.9
3) very ancient JDBCOutputFormat and JDBCInputFormat

For (1), as it's an un-released API, so I think it's safe to move to new
package. cc @Khachatryan Roman  who
contributed this.
For (2), because TableSource and TableSink are not designed to be accessed
by users since 1.11, so I think it's fine to move them.
For (3), I'm not sure how many users are still using these out-of-date
classes.
But I think it's fine to keep them for one more version, and drop them in
the next version.


Best,
Jark

On Thu, 30 Apr 2020 at 22:57, Flavio Pompermaier 
wrote:

> Very big +1 from me
>
> Best,
> Flavio
>
> On Thu, Apr 30, 2020 at 4:47 PM David Anderson 
> wrote:
>
> > I'm very happy to see the jdbc connector being normalized in this way. +1
> > from me.
> >
> > David
> >
> > On Thu, Apr 30, 2020 at 2:14 PM Timo Walther  wrote:
> >
> > > Hi Leonard,
> > >
> > > this sounds like a nice refactoring for consistency. +1 from my side.
> > >
> > > However, I'm not sure how much backwards compatibility is required.
> > > Maybe others can comment on this.
> > >
> > > Thanks,
> > > Timo
> > >
> > > On 30.04.20 14:09, Leonard Xu wrote:
> > > > Hi, dear community
> > > >
> > > > Recently, I’m thinking to refactor the flink-jdbc connector structure
> > > before release 1.11.
> > > > After the refactor, in the future,  we can easily introduce unified
> > > pluggable JDBC dialect for Table and DataStream, and we can have a
> better
> > > module organization and implementations.
> > > >
> > > > So, I propose following changes:
> > > > 1) Use `Jdbc` instead of `JDBC` in the new public API and interface
> > > name. The Datastream API `JdbcSink` which imported in this version has
> > > followed this standard.
> > > >
> > > > 2) Move all interface and classes from `org.apache.flink.java.io
> > .jdbc`(old
> > > package) to `org.apache.flink.connector.jdbc`(new package) to follow
> the
> > > base connector path in FLIP-27.
> > > > I think we can move JDBC TableSource, TableSink and factory from old
> > > package to new package because TableEnvironment#registerTableSource、
> > > TableEnvironment#registerTableSink  will be removed in 1.11 ans these
> > > classes are not exposed to users[1].
> > > > We can move Datastream API JdbcSink from old package to new package
> > > because it’s  introduced in this version.
> > > > We will still keep `JDBCInputFormat` and `JDBCOoutoutFormat` in old
> > > package and deprecate them.
> > > > Other classes/interfaces are internal used and we can move to new
> > > package without breaking compatibility.
> > > > 3) Rename `flink-jdbc` to `flink-connector-jdbc`. well, this is a
> > > compatibility broken change but in order to comply with other
> connectors
> > > and it’s real a connector rather than a flink-jdc-driver[2] we’d better
> > > decide do it ASAP.
> > > >
> > > >
> > > > What do you think? Any feedback is appreciate.
> > > >
> > > >
> > > > Best,
> > > > Leonard Xu
> > > >
> > > > [1]
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Remove-registration-of-TableSource-TableSink-in-Table-Env-and-ConnectTableDescriptor-td37270.html
> > > <
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Remove-registration-of-TableSource-TableSink-in-Table-Env-and-ConnectTableDescriptor-td37270.html
> > > >
> > > > [2]https://github.com/ververica/flink-jdbc-driver <
> > > https://github.com/ververica/flink-jdbc-driver>
> > > >
> > > >
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-84 Feedback Summary

2020-04-30 Thread Jark Wu
1.04.20 09:31, godfrey he wrote:
> > > >>>>>>>>>> Hi, Timo & Dawid,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks so much for the effort of `multiline statements
> > > >> supporting`,
> > > >>>>>>>>>> I have a few questions about this method:
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1. users can well control the execution logic through the
> > > >> proposed
> > > >>>>>> method
> > > >>>>>>>>>>  if they know what the statements are (a statement is a
> > > DDL, a
> > > >>>> DML
> > > >>>>>> or
> > > >>>>>>>>>> others).
> > > >>>>>>>>>> but if a statement is from a file, that means users do not
> > know
> > > >>>> what
> > > >>>>>> the
> > > >>>>>>>>>> statements are,
> > > >>>>>>>>>> the execution behavior is unclear.
> > > >>>>>>>>>> As a platform user, I think this method is hard to use,
> unless
> > > >> the
> > > >>>>>>>>> platform
> > > >>>>>>>>>> defines
> > > >>>>>>>>>> a set of rule about the statements order, such as: no select
> > in
> > > >> the
> > > >>>>>>>>> middle,
> > > >>>>>>>>>> dml must be at tail of sql file (which may be the most case
> in
> > > >>>> product
> > > >>>>>>>>>> env).
> > > >>>>>>>>>> Otherwise the platform must parse the sql first, then know
> > what
> > > >> the
> > > >>>>>>>>>> statements are.
> > > >>>>>>>>>> If do like that, the platform can handle all cases through
> > > >>>>>> `executeSql`
> > > >>>>>>>>> and
> > > >>>>>>>>>> `StatementSet`.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 2. SQL client can't also use `executeMultilineSql` to
> supports
> > > >>>>>> multiline
> > > >>>>>>>>>> statements,
> > > >>>>>>>>>>  because there are some special commands introduced in
> SQL
> > > >>>> client,
> > > >>>>>>>>>> such as `quit`, `source`, `load jar` (not exist now, but
> maybe
> > > we
> > > >>>> need
> > > >>>>>>>>> this
> > > >>>>>>>>>> command
> > > >>>>>>>>>>  to support dynamic table source and udf).
> > > >>>>>>>>>> Does TableEnvironment also supports those commands?
> > > >>>>>>>>>>
> > > >>>>>>>>>> 3. btw, we must have this feature in release-1.11? I find
> > there
> > > >> are
> > > >>>>>> few
> > > >>>>>>>>>> user cases
> > > >>>>>>>>>>  in the feedback document which behavior is unclear now.
> > > >>>>>>>>>>
> > > >>>>>>>>>> regarding to "change the return value from `Iterable > > >>>>>>>>>> `Iterator > > >>>>>>>>>> I couldn't agree more with this change. Just as Dawid
> > mentioned
> > > >>>>>>>>>> "The contract of the Iterable#iterator is that it returns a
> > new
> > > >>>>>> iterator
> > > >>>>>>>>>> each time,
> > > >>>>>>>>>>  which effectively means we can iterate the results
> > multiple
> > > >>>>>> times.",
> > > >>>>>>>>>> we does not provide iterate the results multiple times.
> > > >>>>>>>>>> If we want do that, the client must buffer a

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-30 Thread Jark Wu
he
> origin/destination of these usually will be external catalog, usually in
>
> a
>
> flattened(key/value) representation so I agree it is not as important as
>
> in
>
> the aforementioned case. Nevertheless having a yaml based catalog or
>
> being
>
> able to have e.g. yaml based snapshots of a catalog in my opinion is
> appealing. At the same time cost of being able to have a nice
> yaml/hocon/json representation is just adding a single suffix to a
> single(at most 2 key + value) property. The question is between `format`
>
> =
>
> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
> doing it.
>
> Just to have a full picture. Both cases can be represented in yaml, but
> the difference is significant:
> format: 'json'
> format.option: 'value'
>
> vs
> format:
> kind: 'json'
>
> option: 'value'
>
> Best,
> Dawid
>
> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>
> Personally I don't have any preference here.  Compliance wih standard
>
> YAML
>
> parser is probably more important
>
> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu   
> wrote:
>
>
> From a user's perspective, I prefer the shorter one "format=json",
>
> because
>
> it's more concise and straightforward. The "kind" is redundant for
>
> users.
>
> Is there a real case requires to represent the configuration in JSON
> style?
> As far as I can see, I don't see such requirement, and everything works
> fine by now.
>
> So I'm in favor of "format=json". But if the community insist to follow
> code style on this, I'm also fine with the longer one.
>
> Btw, I also CC user mailing list to listen more user's feedback.
>
> Because I
>
> think this is relative to usability.
>
> Best,
> Jark
>
> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  
> 
> wrote:
>
>
>  > Therefore, should we advocate instead:
>  >
>  > 'format.kind' = 'json',
>  > 'format.fail-on-missing-field' = 'false'
>
> Yes. That's pretty much it.
>
> This is reasonable important to nail down as with such violations I
> believe we could not actually switch to a standard YAML parser.
>
> On 29/04/2020 16:05, Timo Walther wrote:
>
> Hi everyone,
>
> discussions around ConfigOption seem to be very popular recently.
>
> So I
>
> would also like to get some opinions on a different topic.
>
> How do we represent hierarchies in ConfigOption? In FLIP-122, we
> agreed on the following DDL syntax:
>
> CREATE TABLE fs_table (
>  ...
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = 'file:///path/to/whatever',
>  'format' = 'csv',
>  'format.allow-comments' = 'true',
>  'format.ignore-parse-errors' = 'true'
> );
>
> Of course this is slightly different from regular Flink core
> configuration but a connector still needs to be configured based on
> these options.
>
> However, I think this FLIP violates our code style guidelines
>
> because
>
> 'format' = 'json',
> 'format.fail-on-missing-field' = 'false'
>
> is an invalid hierarchy. `format` cannot be a string and a top-level
> object at the same time.
>
> We have similar problems in our runtime configuration:
>
> state.backend=
> state.backend.incremental=
> restart-strategy=
> restart-strategy.fixed-delay.delay=
> high-availability=
> high-availability.cluster-id=
>
> The code style guide states "Think of the configuration as nested
> objects (JSON style)". So such hierarchies cannot be represented in
>
> a
>
> nested JSON style.
>
> Therefore, should we advocate instead:
>
> 'format.kind' = 'json',
> 'format.fail-on-missing-field' = 'false'
>
> What do you think?
>
> Thanks,
> Timo
>
> [1]
>
>
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking 
> UniversityTel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: [DISCUSS] Send issue and pull request notifications for flink-web and flink-shaded to iss...@flink.apache.org

2020-05-04 Thread Jark Wu
Big +1 to this.

Best,
Jark

On Mon, 4 May 2020 at 23:44, Till Rohrmann  wrote:

> Hi everyone,
>
> due to some changes on the ASF side, we are now seeing issue and pull
> request notifications for the flink-web [1] and flink-shaded [2] repo on
> dev@flink.apache.org. I think this is not ideal since the dev ML is much
> more noisy now.
>
> I would propose to send these notifications to iss...@flink.apache.org as
> we are currently doing it for the Flink main repo [3].
>
> What do you think?
>
> [1] https://github.com/apache/flink-web
> [2] https://github.com/apache/flink-shaded
> [3] https://gitbox.apache.org/schemes.cgi?flink
>
> Cheers,
> Till
>


Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

2020-05-04 Thread Jark Wu
Hi Andrey,

Thanks for the tuning ideas. I will explain the design of deduplication.

The mini-batch implementation of deduplication buffers a bundle of input
data in heap (Java Map),
when the bundle size hit the trigger size or trigger time, the buffered
data will be processed together.
So we only need to access the state once per key. This is designed for
rocksdb statebackend to reduce the
frequently accessing, (de)serialization. And yes, this may slow down the
checkpoint, but the suggested
mini-batch timeout is <= 10s. From our production experience, it doesn't
have much impact on checkpoint.

Best,
Jark

On Tue, 5 May 2020 at 06:48, Andrey Zagrebin  wrote:

> Hi lsyldliu,
>
> You can try to tune the StateTtlConfig. As the documentation suggests [1]
> the TTL incremental cleanup can decrease the per record performance. This
> is the price of the automatic cleanup.
> If the only thing, which happens mostly in your operator, is working with
> state then even checking one additional record to cleanup is two times more
> actions to do.
> Timer approach was discussed in TTL feature design. It needs an additional
> implementation and keeps more state but performs only one cleanup action
> exactly when needed so it is a performance/storage trade-off.
>
> Anyways, 20x degradation looks indeed a lot.
> As a first step, I would suggest to configure the incremental cleanup
> explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to
> check, e.g. 1 because processFirstRow/processLastRow already access the
> state twice and do cleanup:
>
> .cleanupIncrementally(1, false)
>
>
> Also not sure but depending on the input data, finishBundle can happen
> mostly during the snapshotting which slows down taking the checkpoint.
> Could this fail the checkpoint accumulating the backpressure and slowing
> down the pipeline?
>
> Not sure why to keep the deduplication data in a Java map and in Flink
> state at the same time, why not to keep it only in Flink state and
> deduplicate on each incoming record?
>
> Best,
> Andrey
>
> [1] note 2 in
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup
>
> On Wed, Apr 29, 2020 at 11:53 AM 刘大龙  wrote:
>
> >
> >
> >
> > > -原始邮件-
> > > 发件人: "Jark Wu" 
> > > 发送时间: 2020-04-29 14:09:44 (星期三)
> > > 收件人: dev , "Yu Li" ,
> > myas...@live.com
> > > 抄送: azagre...@apache.org
> > > 主题: Re: The use of state ttl incremental cleanup strategy in sql
> > deduplication resulting in significant performance degradation
> > >
> > > Hi lsyldliu,
> > >
> > > Thanks for investigating this.
> > >
> > > First of all, if you are using mini-batch deduplication, it doesn't
> > support
> > > state ttl in 1.9. That's why the tps looks the same with 1.11 disable
> > state
> > > ttl.
> > > We just introduce state ttl for mini-batch deduplication recently.
> > >
> > > Regarding to the performance regression, it looks very surprise to me.
> > The
> > > performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
> > > I don't have much experience of the underlying of StateTtlConfig. So I
> > loop
> > > in @Yu Li  @YunTang in CC who may have more insights
> > on
> > > this.
> > >
> > > For more information, we use the following StateTtlConfig [1] in blink
> > > planner:
> > >
> > > StateTtlConfig
> > >   .newBuilder(Time.milliseconds(retentionTime))
> > >   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > >
>  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > >   .build();
> > >
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > [1]:
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 29 Apr 2020 at 11:53, 刘大龙  wrote:
> > >
> > > > Hi, all!
> > > >
> > > > At flink master branch, we have supported state ttl  for sql
> mini-batch
> > > > deduplication using incremental cleanup strategy on heap backend,
> > refer to
> > > > FLINK-16581. Because I want to test the performance of this feature,
> > so I
> > > > compile master branch code and deploy the jar to production
> > > > environment,then run thre

Re: [DISCUSS] Hierarchies in ConfigOption

2020-05-06 Thread Jark Wu
Hi,

I think Timo proposed a good idea to make both side happy. That is:

format = json
json.fail-on-missing-field = true
json.ignore-parse-error = true

value.format = json
value.json.fail-on-missing-field = true
value.json.ignore-parse-error = true

This is a valid hierarchies. Besides, it's more clear that the option
belongs to a specific component (i.e. json).
This will be more readable when we introducing more formats, e.g. parquet.

format = parquet
parquet.compression = ...
parquet.block.size = ...
parquet.page.size = ...

is more readable than current style:

format = parquet
format.compression = ...
format.block.size = ...
format.page.size = ...

To sum up, I'm +1 to use "format = json",  "json.fail-on-missing-field =
true".

Best,
Jark

On Wed, 6 May 2020 at 17:12, Danny Chan  wrote:

> Hi, everyone ~
>
> Allows me to share some thoughts here.
>
> Personally i think for SQL, "format" is obviously better than "format.name",
> it is more concise and straight-forward, similar with Presto FORMAT[2] and
> KSQL VALUE_FORMAT[1]; i think we move from "connector.type" to "connector"
> for the same reason, the "type" or "name" suffix is implicit, SQL syntax
> like the DDL is a top-level user API, so from my side keeping good
> user-friendly syntax is more important.
>
> @Timo I'm big +1 for the a good code style guide, but that does not mean
> we should go for a json-style table options in the DDL, the DDL could have
> its own contract. Can we move "represent these config options in YAML" to
> another topic ? Otherwise, how should we handle the "connector" key, should
> we prefix all the table options with "connector" ? The original inention of
> FLIP-122 is to remove some redundant prefix/suffix of the table options
> because they are obviously implicit there, and the "connector." prefix and
> the ".type" or ".name" suffix are the ones we most want to delete.
>
> @Dawid Although ".type" is just another 4 characters, but we force the SQL
> users to do the thing that is obvious reduadant, i know serialize catalog
> table to YAML or use the options in DataStream has similar keys request,
> but they are different use cases that i believe many SQL user would not
> encounter, that means we force many users to obey rules for cases they
> would never have.
>
>
> [1] https://docs.ksqldb.io/en/latest/developer-guide/create-a-table/
> [2] https://prestodb.io/docs/current/sql/create-table.html
>
> Best,
> Danny Chan
> 在 2020年5月4日 +0800 PM11:34,Till Rohrmann ,写道:
> > Hi everyone,
> >
> > I like Timo's proposal to organize our configuration more hierarchical
> > since this is what the coding guide specifies. The benefit I see is that
> > config options belonging to the same concept will be found in the same
> > nested object. Moreover, it will be possible to split the configuration
> > into unrelated parts which are fed to the respective components. That way
> > one has a much better separation of concern since component A cannot read
> > the configuration of component B.
> >
> > Concerning Timo's last two proposals:
> >
> > If fail-on-missing-field is a common configuration shared by all formats,
> > then I would go with the first option:
> >
> > format.kind: json
> > format.fail-on-missing-field: true
> >
> > If fail-on-missing-field is specific for json, then one could go with
> >
> > format: json
> > json.fail-on-missing-field: true
> >
> > or
> >
> > format.kind: json
> > format.json.fail-on-missing-field: true
> >
> > Cheers,
> > Till
> >
> >
> > On Fri, May 1, 2020 at 11:55 AM Timo Walther  wrote:
> >
> > > Hi Jark,
> > >
> > > yes, in theory every connector can design options as they like. But for
> > > user experience and good coding style we should be consistent in Flink
> > > connectors and configuration. Because implementers of new connectors
> > > will copy the design of existing ones.
> > >
> > > Furthermore, I could image that people in the DataStream API would also
> > > like to configure their connector based on options in the near future.
> > > It might be the case that Flink DataStream API connectors will reuse
> the
> > > ConfigOptions from Table API for consistency.
> > >
> > > I'm favoring either:
> > >
> > > format.kind = json
> > > format.fail-on-missing-field: true
> > >
> > > Or:
> > >
> > > format = json
> > > json.fail-on

Re: [DISCUSS] Hierarchies in ConfigOption

2020-05-06 Thread Jark Wu
Thanks all for the discussion, I have updated FLIP-105 and FLIP-122 to use
the new format option keys.

FLIP-105:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289
FLIP-122:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

Best,
Jark

On Wed, 6 May 2020 at 20:37, Timo Walther  wrote:

> Cool, so let's go with:
>
> format = json
> json.fail-on-missing-field = true
> json.ignore-parse-error = true
>
> value.format = json
> value.json.fail-on-missing-field = true
> value.json.ignore-parse-error = true
>
> Regards,
> Timo
>
>
> On 06.05.20 12:39, Jingsong Li wrote:
> > Hi,
> >
> > +1 to:
> > format = parquet
> > parquet.compression = ...
> > parquet.block.size = ...
> > parquet.page.size = ...
> >
> > For the formats like parquet and orc,
> > Not just Flink itself, but this way also let Flink keys compatible with
> the
> > property keys of Hadoop / Hive / Spark.
> >
> > And like Jark said, this way works for Kafka key value too.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 6, 2020 at 6:19 PM Jark Wu  wrote:
> >
> >> Hi,
> >>
> >> I think Timo proposed a good idea to make both side happy. That is:
> >>
> >> format = json
> >> json.fail-on-missing-field = true
> >> json.ignore-parse-error = true
> >>
> >> value.format = json
> >> value.json.fail-on-missing-field = true
> >> value.json.ignore-parse-error = true
> >>
> >> This is a valid hierarchies. Besides, it's more clear that the option
> >> belongs to a specific component (i.e. json).
> >> This will be more readable when we introducing more formats, e.g.
> parquet.
> >>
> >> format = parquet
> >> parquet.compression = ...
> >> parquet.block.size = ...
> >> parquet.page.size = ...
> >>
> >> is more readable than current style:
> >>
> >> format = parquet
> >> format.compression = ...
> >> format.block.size = ...
> >> format.page.size = ...
> >>
> >> To sum up, I'm +1 to use "format = json",  "json.fail-on-missing-field =
> >> true".
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 6 May 2020 at 17:12, Danny Chan  wrote:
> >>
> >>> Hi, everyone ~
> >>>
> >>> Allows me to share some thoughts here.
> >>>
> >>> Personally i think for SQL, "format" is obviously better than "
> >> format.name",
> >>> it is more concise and straight-forward, similar with Presto FORMAT[2]
> >> and
> >>> KSQL VALUE_FORMAT[1]; i think we move from "connector.type" to
> >> "connector"
> >>> for the same reason, the "type" or "name" suffix is implicit, SQL
> syntax
> >>> like the DDL is a top-level user API, so from my side keeping good
> >>> user-friendly syntax is more important.
> >>>
> >>> @Timo I'm big +1 for the a good code style guide, but that does not
> mean
> >>> we should go for a json-style table options in the DDL, the DDL could
> >> have
> >>> its own contract. Can we move "represent these config options in YAML"
> to
> >>> another topic ? Otherwise, how should we handle the "connector" key,
> >> should
> >>> we prefix all the table options with "connector" ? The original
> inention
> >> of
> >>> FLIP-122 is to remove some redundant prefix/suffix of the table options
> >>> because they are obviously implicit there, and the "connector." prefix
> >> and
> >>> the ".type" or ".name" suffix are the ones we most want to delete.
> >>>
> >>> @Dawid Although ".type" is just another 4 characters, but we force the
> >> SQL
> >>> users to do the thing that is obvious reduadant, i know serialize
> catalog
> >>> table to YAML or use the options in DataStream has similar keys
> request,
> >>> but they are different use cases that i believe many SQL user would not
> >>> encounter, that means we force many users to obey rules for cases they
> >>> would never have.
> >>>
> >>>
> >>> [1] https://docs.ksqldb.io/en/latest/developer-guide/create-a-table/
> >>> [2] https://prestodb.io/docs/current/sql/create-table.html
>

Re: set a retract switch

2020-05-06 Thread Jark Wu
Hi Lec,

You can use `StreamTableEnvironment#toRetractStream(table, Row.class)` to
get a `DataStream>`.
The true Boolean flag indicates an add message, a false flag indicates a
retract (delete) message. So you can just simply apply
 a flatmap function after this to ignore the false messages. Then you can
get a pure UPSERT stream.

Btw, such question should be posted in u...@flink.apache.org, not the dev
mailing list.

Best,
Jark

On Thu, 7 May 2020 at 10:07, lec ssmi  wrote:

> Hi:
>  During the execution of flink, especially the sql API, many operations
> in DataStream are not available. In many cases, we don't care about the
> DELETE record when retracting. Is it possible to set a switch so that the
> DELETE record when retracting is not processed? In other words, the
> downstream only receives a value after UPDATE, and does not need to receive
> the value before UPDATE. In some programming modes, processing DELETE
> records actually makes the logic more complicated.
>
> Best
> Lec Ssmi
>


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-07 Thread Jark Wu
Hi,

I agree what Fabian said above.
Besides, IMO, (3) is in a lower priority and will involve much more things.
It makes sense to me to do it in two-phase.

Regarding to (3), the key point to convert an append-only table into
changelog table is that the framework should know the operation type,
so we introduced a special CREATE VIEW syntax to do it in the documentation
[1]. Here is an example:

-- my_binlog table is registered as an append-only table
CREATE TABLE my_binlog (
  before ROW<...>,
  after ROW<...>,
  op STRING,
  op_ms TIMESTAMP(3)
) WITH (
  'connector.type' = 'kafka',
  ...
);

-- interpret my_binlog as a changelog on the op_type and id key
CREATE VIEW my_table AS
  SELECT
after.*
  FROM my_binlog
  CHANGELOG OPERATION BY op
  UPDATE KEY BY (id);

-- my_table will materialize the insert/delete/update changes
-- if we have 4 records in dbz that
-- a create for 1004
-- an update for 1004
-- a create for 1005
-- a delete for 1004
> SELECT COUNT(*) FROM my_table;
+---+
|  COUNT(*) |
+---+
| 1 |
+---+

Best,
Jark

[1]:
https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb


On Fri, 8 May 2020 at 00:24, Fabian Hueske  wrote:

> Thanks for the summary Konstantin.
> I think you got all points right.
>
> IMO, the way forward would be to work on a FLIP to define
> * the concept of temporal tables,
> * how to feed them from retraction tables
> * how to feed them from append-only tables
> * their specification with CREATE TEMPORAL TABLE,
> * how to use temporal tables in temporal table joins
> * how (if at all) to use temporal tables in other types of queries
>
> We would keep the LATERAL TABLE syntax because it used for regular
> table-valued functions.
> However, we would probably remove the TemporalTableFunction (which is a
> built-in table-valued function) after we deprecated it for a while.
>
> Cheers, Fabian
>
> Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf <
> kna...@apache.org>:
>
>> Hi everyone,
>>
>> Thanks everyone for joining the discussion on this. Please let me
>> summarize
>> what I have understood so far.
>>
>> 1) For joining an append-only table and a temporal table the syntax the
>> "FOR
>> SYSTEM_TIME AS OF " seems to be preferred (Fabian, Timo,
>> Seth).
>>
>> 2) To define a temporal table based on a changelog stream from an external
>> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be used.
>> 3) In order to also support temporal tables derived from an append-only
>> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian)
>> or
>> need to have a way to convert an append-only table into a changelog table
>> (briefly discussed in [1]). It is not completely clear to me how a
>> temporal
>> table based on an append-only table would be with the syntax proposed in
>> [1] and 2). @Jark Wu  could you elaborate a bit on
>> that?
>>
>> How do we move forward with this?
>>
>> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense.
>> What do you think? * If we proceed like this, what would this mean for the
>> current syntax of LATERAL TABLE? Would we keep it? Would we eventually
>> deprecate and drop it? Since only after 3) we would be on par with the
>> current temporal table function join, I assume, we could only drop it
>> thereafter.
>>
>> Thanks, Konstantin
>>
>> [1]
>>
>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6
>>
>>
>> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu  wrote:
>>
>> > Hi Fabian,
>> >
>> > Just to clarify a little bit, we decided to move the "converting
>> > append-only table into changelog table" into future work.
>> > So FLIP-105 only introduced some CDC formats (debezium) and new
>> TableSource
>> > interfaces proposed in FLIP-95.
>> > I should have started a new FLIP for the new CDC formats and keep
>> FLIP-105
>> > as it is to avoid the confusion, sorry about that.
>> >
>> > Best,
>> > Jark
>> >
>> >
>> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske  wrote:
>> >
>> > > Thanks Jark!
>> > >
>> > > I certainly need to read up on FLIP-105 (and I'll try to adjust my
>> > > terminology to changelog table from now on ;-) )
>> > > If FLIP-105 addresses the issue of converting an append-only table
>> into a
>> > > changelog table that upserts on primary key (basically what 

Re: [DISCUSS] Align the behavior of internal return result of MapState#entries, keys, values and iterator.

2020-05-09 Thread Jark Wu
+1 to return emty iterator and align the implementations.

Best,
Jark

On Sat, 9 May 2020 at 19:18, SteNicholas  wrote:

> Hi Tang Yun,
>  I agree with the point you mentioned that align these internal
> behavior
> to return empty iterator instead of null. In my opinion,
> StateMapViewWithKeysNullable handle nullable map keys, and result of
> internal map state should be empty map in the null behavior case.
> Therefore,
> as you mentioned, #iterator() should better return empty iterator.
>
> Thanks,
> Nicholas Jiang
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

2020-05-09 Thread Jark Wu
Hi,

Regarding to the `open()/close()`, I think it's necessary for Table&SQL to
compile the generated code.
In Table&SQL, the watermark strategy and event-timestamp is defined using
SQL expressions, we will
translate and generate Java code for the expressions. If we have
`open()/close()`, we don't need lazy initialization.
Besides that, I can see a need to report some metrics, e.g. the current
watermark, the dirty timestamps (null value), etc.
So I think a simple `open()/close()` with a context which can get
MetricGroup is nice and not complex for the first version.

Best,
Jark



On Sun, 10 May 2020 at 00:50, Stephan Ewen  wrote:

> Thanks, Aljoscha, for picking this up.
>
> I agree with the approach of doing the here proposed set of changes for
> now. It already makes things simpler and adds idleness support everywhere.
>
> Rich functions and state always add complexity, let's do this in a next
> step, if we have a really compelling case.
>
>
> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek 
> wrote:
>
> > Regarding the WatermarkGenerator (WG) interface itself. The proposal is
> > basically to turn emitting into a "flatMap", we give the
> > WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> > decide whether to output a watermark or not and can also mark the output
> > as idle. Changing the interface to return a Watermark (as the previous
> > watermark assigner interface did) would not allow that flexibility.
> >
> > Regarding checkpointing the watermark and keeping track of the minimum
> > watermark, this would be the responsibility of the framework (or the
> > KafkaConsumer in the current implementation). The user-supplied WG does
> > not need to make sure the watermark doesn't regress.
> >
> > Regarding making the WG a "rich function", I can see the potential
> > benefit but I also see a lot of pitfalls. For example, how should the
> > watermark state be handled in the case of scale-in? It could be made to
> > work in the Kafka case by attaching the state to the partition state
> > that we keep, but then we have potential backwards compatibility
> > problems also for the WM state. Does the WG usually need to keep the
> > state or might it be enough if the state is transient, i.e. if you have
> > a restart the WG would loose its histogram but it would rebuild it
> > quickly and you would get back to the same steady state as before.
> >
> > Best,
> > Aljoscha
> >
> > On 27.04.20 12:12, David Anderson wrote:
> > > Overall I like this proposal; thanks for bringing it forward, Aljoscha.
> > >
> > > I also like the idea of making the Watermark generator a rich function
> --
> > > this should make it more straightforward to implement smarter watermark
> > > generators. Eg, one that uses state to keep statistics about the actual
> > > out-of-orderness, and uses those statistics to implement a variable
> > delay.
> > >
> > > David
> > >
> > > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas 
> > wrote:
> > >
> > >> Hi Aljoscha,
> > >>
> > >> Thanks for opening the discussion!
> > >>
> > >> I have two comments on the FLIP:
> > >> 1) we could add lifecycle methods to the Generator, i.e. open()/
> > >> close(), probably with a Context as argument: I have not fully thought
> > >> this through but I think that this is more aligned with the rest of
> > >> our rich functions. In addition, it will allow, for example, to
> > >> initialize the Watermark value, if we decide to checkpoint the
> > >> watermark (see [1]) (I also do not know if Table/SQL needs to do
> > >> anything in the open()).
> > >> 2) aligned with the above, and with the case where we want to
> > >> checkpoint the watermark in mind, I am wondering about how we could
> > >> implement this in the future. In the FLIP, it is proposed to expose
> > >> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> > >> that there is the implicit contract that watermarks are
> > >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> > >> assume) a check that will compare the last emitted WM against the
> > >> provided one, and emit it only if it is >=. If not, then we risk
> > >> having the user shooting himself on the foot if he/she accidentally
> > >> forgets the check. Given that the WatermarkGenerator and its caller do
> > >> not know if the watermark was finally emitted or not (the
> > >> WatermarkOutput#emitWatermark returns void), who will be responsible
> > >> for checkpointing the WM?
> > >>
> > >> Given this, why not having the methods as:
> > >>
> > >> public interface WatermarkGenerator {
> > >>
> > >>  Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> > >> output);
> > >>
> > >>  Watermark onPeriodicEmit(WatermarkOutput output);
> > >> }
> > >>
> > >> and the caller will be the one enforcing any invariants, such as
> > >> non-decreasing watermarks. In this way, the caller can checkpoint
> > >> anything that is needed as it will have complete knowledge as to if
> > >> the WM was em

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

2020-05-11 Thread Jark Wu
Thanks for the explanation. I like the fatory pattern to make the member
variables immutable and final.

So +1 to the proposal.

Best,
Jark

On Mon, 11 May 2020 at 22:01, Stephan Ewen  wrote:

> I am fine with that.
>
> Much of the principles seem agreed upon. I understand the need to support
> code-generated extractors and we should support most of it already (as
> Aljoscha mentioned via the factories) can extend this if needed.
>
> I think that the factory approach supports code-generated extractors in a
> cleaner way even than an extractor with an open/init method.
>
>
> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek 
> wrote:
>
> > We're slightly running out of time. I would propose we vote on the basic
> > principle and remain open to later additions. This feature is quite
> > important to make the new Kafka Source that is developed as part of
> > FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> > the newly added connector.
> >
> > I know that's a bit unorthodox but would everyone be OK with what's
> > currently there and then we iterate?
> >
> > Best,
> > Aljoscha
> >
> > On 11.05.20 13:57, Aljoscha Krettek wrote:
> > > Ah, I meant to write this in my previous email, sorry about that.
> > >
> > > The WatermarkStrategy, which is basically a factory for a
> > > WatermarkGenerator is the replacement for the open() method. This is
> the
> > > same strategy that was followed for StreamOperatorFactory, which was
> > > introduced to allow code generation in the Table API [1]. If we need
> > > metrics or other things we would add that as a parameter to the factory
> > > method. What do you think?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11974
> > >
> > > On 10.05.20 05:07, Jark Wu wrote:
> > >> Hi,
> > >>
> > >> Regarding to the `open()/close()`, I think it's necessary for
> > >> Table&SQL to
> > >> compile the generated code.
> > >> In Table&SQL, the watermark strategy and event-timestamp is defined
> > using
> > >> SQL expressions, we will
> > >> translate and generate Java code for the expressions. If we have
> > >> `open()/close()`, we don't need lazy initialization.
> > >> Besides that, I can see a need to report some metrics, e.g. the
> current
> > >> watermark, the dirty timestamps (null value), etc.
> > >> So I think a simple `open()/close()` with a context which can get
> > >> MetricGroup is nice and not complex for the first version.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >>
> > >> On Sun, 10 May 2020 at 00:50, Stephan Ewen  wrote:
> > >>
> > >>> Thanks, Aljoscha, for picking this up.
> > >>>
> > >>> I agree with the approach of doing the here proposed set of changes
> for
> > >>> now. It already makes things simpler and adds idleness support
> > >>> everywhere.
> > >>>
> > >>> Rich functions and state always add complexity, let's do this in a
> next
> > >>> step, if we have a really compelling case.
> > >>>
> > >>>
> > >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> aljos...@apache.org>
> > >>> wrote:
> > >>>
> > >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
> > is
> > >>>> basically to turn emitting into a "flatMap", we give the
> > >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> can
> > >>>> decide whether to output a watermark or not and can also mark the
> > >>>> output
> > >>>> as idle. Changing the interface to return a Watermark (as the
> previous
> > >>>> watermark assigner interface did) would not allow that flexibility.
> > >>>>
> > >>>> Regarding checkpointing the watermark and keeping track of the
> minimum
> > >>>> watermark, this would be the responsibility of the framework (or the
> > >>>> KafkaConsumer in the current implementation). The user-supplied WG
> > does
> > >>>> not need to make sure the watermark doesn't regress.
> > >>>>
> > >>>> Regarding making the WG a "rich function&quo

Re: Discussion on Project Idea for Session Of docs 2020

2020-05-20 Thread Jark Wu
Hi Divya,

I think you may not subscribe the dev mailing list, that's why you can't
see Seth's reply.
You can subscribe dev mailing list by sending email to
dev-subscr...@flink.apache.org

I copied Seth's reply here, hope it can help you:

=

Hi Divya,

Thrilled to see your interest in the project. We are looking to work with
someone to expand Flink SQL's reach. What we proposed on the blog is our
initial idea's but we are open to any suggestions or modifications on the
proposal if you have ideas about how it could be improved. I am happy to
answer any specific questions, but otherwise, the first step is to apply
via the GSoD website[1].

Seth

[1] https://developers.google.com/season-of-docs/docs/tech-writer-guide



On Wed, 20 May 2020 at 17:13, Divya Sanghi 
wrote:

> Follow up ,can anyone please reply
>
> On Tue, 19 May 2020, 18:02 Divya Sanghi, 
> wrote:
>
> > Hello Aljoscha , Sjwiesman
> >
> > I am working on Big Data technologies and have hands-on experience on
> > Flink, Spark, Kafka.
> >
> > These are some tasks done by me:
> > 1. I did POC where I created a Docker image of Fink job and ran it on the
> > K8S cluster on the local machine.
> > Attaching my POC project: https://github.com/sanghisha145/flink_on_k8s
> >
> > 2. Even working on writing FlinkKafkaConsumer Job with autoscaling
> > feature to reduce consumer lag
> >
> > I really find this project " Restructure the Table API & SQL
> Documentation
> >  " interesting and feel that I can contribute whole-heartedly to
> > documentation on it.
> >
> > Let me know where I can start?
> >
> > PS:  I have not written any open documentation but has written many for
> my
> > organization(created many technical articles on confluence page)
> >
> > Thanks
> > Divya
> >
>


Re: Interested In Google Season of Docs!

2020-05-20 Thread Jark Wu
Hi Roopal,

I think you may not subscribe the dev mailing list, that's why you can't
see Seth's reply.
You can subscribe dev mailing list by sending email to
dev-subscr...@flink.apache.org

I copied Seth's reply here, hope it can help you:

=

Hi Roopal,

*Restructure the Table API & SQL Documentation*

Right now the Table / SQL documentation is very technical and dry to read.
It is more like a technical specification than documentation that a new
user could read. This project is to take the existing content and
rewrite/reorganize it to make what the community already has more
approachable.

*Extend the Table API & SQL Documentation*

This is to add brand new content. This could be tutorials and walkthroughs
to help onboard new users or expanding an under-documented section such as
Hive interop.

These projects admittedly sound very similar and I will make a note to
better explain them in future applications if the community applies next
year.

Please let me know if you have any more questions,

Seth



On Wed, 20 May 2020 at 17:13, Roopal Jain  wrote:

> Hi Marta,
>
>
> Just wanted to check if you had time to see my email
>
> On Sun, 17 May 2020, 18:01 Roopal Jain,  wrote:
>
> > Hi Marta,
> >
> > Thank you for responding to my email. I went through the link you
> > attached. I have a few questions:
> > 1. As per the
> https://flink.apache.org/news/2020/05/04/season-of-docs.html,
> > two ideas are mentioned, one is restructuring the Table API & SQL
> > documentation and other is extending the same. I want to know how are
> these
> > two different? Can one person do both?
> > 2. You mentioned that FLIP-60 is very broad, should I start picking one
> > section from it, for example, Table API and start contributing content
> from
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/
> in
> > the specified order in FLIP-60? Can you please guide me more on how
> should
> > I proceed ahead (choosing a specific area)?
> >
> >
> > Thanks & Regards,
> > Roopal Jain
> >
> >
> > On Sat, 16 May 2020 at 17:39, Marta Paes Moreira 
> > wrote:
> >
> >> Hi, Roopal.
> >>
> >> Thanks for reaching out, we're glad to see that you're interested in
> >> giving the Flink docs a try!
> >>
> >> To participate in Google Season of Docs (GSoD), you'll have to submit an
> >> application once the application period opens (June 9th). Because
> FLIP-60
> >> is very broad, the first step would be to think about what parts of it
> >> you'd like to focus on — or, what you'd like to have as a project
> proposal
> >> [1].
> >>
> >> You can find more information about the whole process of participating
> in
> >> GSoD in [2].
> >>
> >> Let me know if I can help you with anything else or if you need support
> >> with choosing your focus areas.
> >>
> >> Marta
> >>
> >> [1]
> >>
> https://developers.google.com/season-of-docs/docs/tech-writer-application-hints#project_proposal
> >> [2]
> >>
> https://developers.google.com/season-of-docs/docs/tech-writer-application-hints
> >>
> >> On Sat, May 16, 2020 at 8:32 AM Roopal Jain  wrote:
> >>
> >>> Hello Flink Dev Community!
> >>>
> >>> I am interested in participating in Google Season of Docs for Apache
> >>> Flink.
> >>> I went through the FLIP-60 detailed proposal and thought this is
> >>> something
> >>> I could do well. I am currently working as a software engineer and
> have a
> >>> B.E in Computer Engineering from one of India's reputed engineering
> >>> colleges. I have prior open-source contribution with mentoring for
> Google
> >>> Summer of Code and Google Code-In.
> >>> I have prior work experience on Apache Spark and a good grasp on SQL,
> >>> Java,
> >>> and Python.
> >>> Please guide me more on how to get started?
> >>>
> >>> Thanks & Regards,
> >>> Roopal Jain
> >>>
> >>
>


Re: [DISCUSS] Semantics of our JIRA fields

2020-05-22 Thread Jark Wu
Thanks Robert for bringing up this. +1 to the proposal.

>From my perspective, I would like we can clearify one more thing about "fix
version/s" in this wiki.
IIRC, if a fix is targeted to be fixed in "1.11.0", then it obviously is
fixed in "1.12.0", so such a bug fix should only set "1.11.0", not both
"1.11.0, 1.12.0".
This rule doesn't apply to 1.11.x where x  >= 1.  This problem happens
frequently during release and such information is quite hidden.

Best,
Jark

On Fri, 22 May 2020 at 17:12, Yuan Mei  wrote:

> +1
> Thanks Robert for these detailed explanations!
> It is definitely useful to have a clear standard to avoid confusion when
> creating Jira, especially for starters.
>
> Thanks for the efforts!
>
> Best,
> Yuan
>
>
> On Fri, May 22, 2020 at 2:07 PM Robert Metzger 
> wrote:
>
> > Hi all,
> >
> > I have the feeling that the semantics of some of our JIRA fields (mostly
> > "affects versions", "fix versions" and resolve / close) are not defined
> in
> > the same way by all the core Flink contributors, which leads to cases
> where
> > I spend quite some time on filling the fields correctly (at least what I
> > consider correctly), and then others changing them again to match their
> > semantics.
> > In an effort to increase our efficiency, and since I'm creating a lot of
> > (test instability-related) tickets these days, I would like to discuss
> the
> > semantics, come to a conclusion and document this in our Wiki.
> >
> > *Proposal:*
> >
> > *Priority:*
> > "Blocker": needs to be resolved before a release (matched based on fix
> > versions)
> > "Critical": strongly considered before a release
> > other priorities have no practical meaning in Flink.
> >
> > *Component/s:*
> > Primary component relevant for this feature / fix.
> > For test-related issues, add the component the test belongs to (for
> example
> > "Connectors / Kafka" for Kafka test failures) + "Test".
> > The same applies for documentation tickets. For example, if there's
> > something wrong with the DataStream API, add it to the "API / DataStream"
> > and "Documentation" components.
> >
> >
> > *Affects Version/s:*Only for Bug / Task-type tickets: We list all
> currently
> > supported and unreleased Flink versions known to be affected by this.
> >
> > Example: If I see a test failure that happens on "master" and
> > "release-1.11", I set "affects version" to "1.12.0" and "1.11.0".
> >
> >
> > *Fix Version/s:*
> > For closed/resolved tickets, this field lists the released Flink versions
> > that contain a fix or feature for the first time.
> > For open tickets, it indicates that a fix / feature should be contained
> in
> > the listed versions. Only blocker issues can block a release, all other
> > tickets which have "fix version/s" set at the time of a release and are
> > unresolved will be moved to the next version.
> >
> > *Assignee:*
> > Person currently working on the ticket. Assigned after conclusion on
> > approach by a committer.
> > Often, fixes are obvious and committers self-assign w/o discussion.
> >
> > *Resolve / Close:*
> > You can either Resolve or Close a ticket once it is done (fixed,
> rejected,
> > invalid, ...).
> >
> > As a rule, we Close tickets instead of Resolving them when they are done.
> >
> > Background: There are semantic differences for Resolve and Close
> > (implementor vs reporter considers it done), but I don't see how they
> > practically apply to the Flink project. Looking at the numbers, Flink has
> > 11066 closed tickets, and 3372 resolved tickets (that's why I propose to
> > close instead of resolve)
> >
> > *Labels:*
> > "test-stability" for all test instabilities
> > "starter" for tickets suitable for new contributors
> >
> > *Release Note:*
> > Small notes that will be included into the release notes published with
> the
> > release.
> >
> >
> > *All other fields are not used not used on a regular basis.*
> >
> > Please +1 my proposal if you want it to be published in our Wiki like
> that
> > or let me know if I got something wrong here.
> >
> > Best,
> > Robert
> >
>


Re: [Discussion] flink elasticsearch connector supports

2020-06-04 Thread Jark Wu
Thanks Jacky for starting this discussion.

The requirement of ES source has been proposed in the community many
times. +1 for the feature from my side.

Here are my thoughts:

1. streaming source
As we only support bounded source for JDBC and HBase, so I think it's fine
to have a bounded ES source.

2. lookup source
Have you ever thought about having ES as a lookup source just like an HBase
table and lookup by index?
I'm not sure whether it works. But my gut feeling tells me it is an
interesting feature and there may be a need for it.

3. DDL options
I agree with Yangze, it is important to list what new options you want to
add. It would be nice to organize your design
 doc according to FLIP template (to have "Public Interface" and "Proposed
Changes").

4. Implement in new table source interface (FLIP-95)
Since 1.11, we proposed a new set of table connector interfaces (FLIP-95)
with more powerful features.
Old table source interface will be removed in the future.

5. DataStream source
It would be nicer to expose a DataStream source too, and share
implementations as much as possible.


Best,
Jark


On Thu, 4 Jun 2020 at 22:07, Etienne Chauchot  wrote:

> Hi,
>
> I made the Elasticsearch connector of Apache Beam and I was thinking
> about doing the same for Flink when I came by this discussion. I have
> some comments regarding the design doc:
>
> 1. Streaming source:
>
> ES has data streams features but only for time series data; the aim of
> this source is to read all kind of data. Apart from data streams,  ES
> behaves like a database: you read the content of an index (similar to a
> table) corresponding to the given query (similar to SQL). So, regarding
> streaming changes, if there are changes between 2 read requests made by
> the source, at the second the whole index (containing the change) will
> be read another time. So, I see no way of having a regular flow of
> documents updates (insertion, deletion, update) as we would need for a
> streaming source. Regarding failover: I guess exactly once semantics
> cannot be guaranteed, only at least once semantics can. Indeed there is
> no ack mechanism on already read data. As a conclusion, IMO you are
> right to target only batch source. Also this answers Yangze Guo's
> question about streaming source. Question is: can a batch only source be
> accepted as a built in flink source ?
>
> 2. hadoop ecosystem
>
> Why not use RichParallelSourceFunction ?
>
> 3. Splitting
>
> Splitting with one split = one ES shard could lead to sub-parallelism.
> IMHO I think that what's important is the number of executors there are
> in the Flink cluster: it is better to use
> runtimeContext.getIndexOfThisSubtask() and
> runtimeContext.getMaxNumberOfParallelSubtasks() to split the input data
> using ES slice API.
>
> 4. Targeting ES 5, 6, 7
>
> In Beam I used low level REST client because it is compatible with all
> ES versions so it allows to have the same code base for all versions.
> But this client is very low level (String based requests). Now, high
> level rest client exists (it was not available at the time), it is the
> one I would use. It is also available for ES 5 so you should use it for
> ES 5 instead of deprecated Transport client.
>
> Best
>
> Etienne Chauchot.
>
>
> On 04/06/2020 08:47, Yangze Guo wrote:
> > Hi, Jackey.
> >
> > Thanks for driving this discussion. I think this proposal should be a
> > FLIP[1] since it impacts the public interface. However, as we have
> > only some preliminary discussions atm, a design draft would be ok. But
> > it would be better to organize your document according to [2].
> >
> > I've two basic questions:
> > - Could your summarize all the public API and configurations (DDL) of
> > the ElasticSearchTableSource?
> > - If we want to implement ElasticSearch DataStream Source at the same
> > time, do we need to do a lot of extra work apart from this?
> >
> > It also would be good if you could do some tests (performance and
> > correctness) to address Robert's comments.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Jun 3, 2020 at 9:41 AM Jacky Lau  wrote:
> >> Hi Robert Metzger:
> >>  Thanks for your response. could you please read this docs.
> >>
> https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067
> >> . Any Is it any problem here? we are worried about
> >> we do not think  throughly. thanks.
> >>
> >>
> >>
> >> --
> >> Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-06-04 Thread Jark Wu
28M flink-python_2.11-1.10.0.jar
> >> >>22K flink-queryable-state-runtime_2.11-1.10.0.jar
> >> >>18M flink-s3-fs-hadoop-1.10.0.jar
> >> >>31M flink-s3-fs-presto-1.10.0.jar
> >> >> 196K flink-shaded-netty-tcnative-dynamic-2.0.25.Final-9.0.jar
> >> >> 518K flink-sql-client_2.11-1.10.0.jar
> >> >>99K flink-state-processor-api_2.11-1.10.0.jar
> >> >>25M flink-swift-fs-hadoop-1.10.0.jar
> >> >> 160M opt
> >> >>
> >> >> The "filesystem" connectors ar ethe heavy hitters, there.
> >> >>
> >> >> I downloaded most of the SQL connectors/formats and this is what I
> got:
> >> >>
> >> >>73K flink-avro-1.10.0.jar
> >> >>36K flink-csv-1.10.0.jar
> >> >>55K flink-hbase_2.11-1.10.0.jar
> >> >>88K flink-jdbc_2.11-1.10.0.jar
> >> >>42K flink-json-1.10.0.jar
> >> >>20M flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
> >> >> 2.8M flink-sql-connector-kafka_2.11-1.10.0.jar
> >> >>24M sql-connectors-formats
> >> >>
> >> >> We could just add these to the Flink distribution without blowing it
> up
> >> >> by much. We could drop any of the existing "filesystem" connectors
> from
> >> >> opt and add the SQL connectors/formats and not change the size of
> Flink
> >> >> dist. So maybe we should do that instead?
> >> >>
> >> >> We would need some tooling for the sql-client shell script to pick-up
> >> >> the connectors/formats up from opt/ because we don't want to add them
> >> to
> >> >> lib/. We're already doing that for finding the flink-sql-client jar,
> >> >> which is also not in lib/.
> >> >>
> >> >> What do you think?
> >> >>
> >> >> Best,
> >> >> Aljoscha
> >> >>
> >> >> On 17.04.20 05:22, Jark Wu wrote:
> >> >>> Hi,
> >> >>>
> >> >>> I like the idea of web tool to assemble fat distribution. And the
> >> >>> https://code.quarkus.io/ looks very nice.
> >> >>> All the users need to do is just select what he/she need (I think
> this
> >> >> step
> >> >>> can't be omitted anyway).
> >> >>> We can also provide a default fat distribution on the web which
> >> default
> >> >>> selects some popular connectors.
> >> >>>
> >> >>> Best,
> >> >>> Jark
> >> >>>
> >> >>> On Fri, 17 Apr 2020 at 02:29, Rafi Aroch 
> >> wrote:
> >> >>>
> >> >>>> As a reference for a nice first-experience I had, take a look at
> >> >>>> https://code.quarkus.io/
> >> >>>> You reach this page after you click "Start Coding" at the project
> >> >> homepage.
> >> >>>> Rafi
> >> >>>>
> >> >>>>
> >> >>>> On Thu, Apr 16, 2020 at 6:53 PM Kurt Young 
> wrote:
> >> >>>>
> >> >>>>> I'm not saying pre-bundle some jars will make this problem go
> away,
> >> and
> >> >>>>> you're right that only hides the problem for
> >> >>>>> some users. But what if this solution can hide the problem for 90%
> >> >> users?
> >> >>>>> Would't that be good enough for us to try?
> >> >>>>>
> >> >>>>> Regarding to would users following instructions really be such a
> big
> >> >>>>> problem?
> >> >>>>> I'm afraid yes. Otherwise I won't answer such questions for at
> >> least a
> >> >>>>> dozen times and I won't see such questions coming
> >> >>>>> up from time to time. During some periods, I even saw such
> questions
> >> >>>> every
> >> >>>>> day.
> >> >>>>>
> >> >>>>> Best,
> >> >>>>> Kurt
> >> >>>>>
> >> >>>>>
> >> >>>>> On Thu, Apr 16, 2020 at 11:21 PM Chesnay Schepler <
> >> ches...@apache.org>
> >> >>>>> wrote:
> >>

Re: [ANNOUNCE] New Apache Flink Committer - Xintong Song

2020-06-05 Thread Jark Wu
Congratulations Xintong!

Best,
Jark

On Fri, 5 Jun 2020 at 14:32, Danny Chan  wrote:

> Congratulations Xintong !
>
> Best,
> Danny Chan
> 在 2020年6月5日 +0800 PM2:20,dev@flink.apache.org,写道:
> >
> > Congratulations Xintong
>


[ANNOUNCE] New Flink Committer: Benchao Li

2020-06-08 Thread Jark Wu
Hi everyone,

On behalf of the PMC, I'm very happy to announce Benchao Li as a new Apache
Flink committer.

Benchao started contributing to Flink since late 2018. He is very active in
Flink SQL component,
and has also participated in many discussions, bug fixes. Over the past few
months, he helped tremendously in answering user questions in the mailing
list.

Please join me in congratulating Benchao for becoming a Flink committer!

Thanks,
Jark


Re: [DISCUSS] Add Japanese translation of the flink.apache.org website

2020-06-09 Thread Jark Wu
I agree with Dawid and others' opinions.

We may not have enough resources to maintain more languages.
Maybe it's time to investigate better translation/synchronization tools
again.

I want to share some background about the current translation process. In
the initial proposal of Chinese translation FLIP-35 [1],
we have considered Docusaurus/Crowdin as the localization tool, but it
seems that Crowdin doesn't fit well with Jekyll (Liquid codes).
But it's been a year and a half, maybe it's time to re-investigate them or
other tools.

Here is the list of how other ASF projects are dealing translation of what
I know:
- Apache Pulsar uses Crowdin: https://github.com/apache/pulsar-translation
- Apache Kylin: the similar way of Flink:
https://github.com/apache/kylin/tree/document/website
- Apache RocketMQ: a separate repository, synchronize manually and
periodically: https://github.com/apache/rocketmq/tree/master/docs/cn

Here is the list of localization tool of what I know:
- Docusaurus: https://docusaurus.io/
- Crowdin: https://crowdin.com/
- GitLocalize: https://gitlocalize.com/

Best,
Jark

On Tue, 9 Jun 2020 at 16:24, Marta Paes Moreira  wrote:

> Thanks for bringing this PR to our attention, Robert!
>
> Like Dawid and Xintong, I'm concerned that adding a new language will make
> it unbearable to ensure docs synchronization. Best-effort maintenance of
> non-english docs may also result in a bad documentation experience for
> users, and this is something that we should be especially careful about in
> locations where Flink adoption might still be in an early phase, IMO. At
> the same time, I understand that having a japanese translation would
> probably be very helpful for Japanese-speaking users and help with Flink
> adoption in Japan.
>
> I see this happening for other up-and-coming locations (e.g.
> Spanish-speaking countries) in the future, so in order to be able to accept
> these contributions I'm also of the opinion that we first need an efficient
> synchronization tool. Maybe it would be worth investigating how other ASF
> projects are dealing with this.
>
> Marta
>
> On Mon, Jun 8, 2020 at 11:46 AM Xintong Song 
> wrote:
>
> > I think Dawid has a good point. With or without another new language,
> > I'm +1 for trying to have a better tooling/process for synchronizing doc
> > changes.
> >
> > Currently, it is not only hard for people who want to modify the English
> > docs, but also for people who try to update the translated copies
> > accordingly. I'm currently working on updating the Chinese translations
> for
> > the memory configuration documents, and I found it very hard to identify
> > the parts that need updates. The English docs are reorganized, contents
> are
> > moved across pages, and also small pieces of details are modified. It is
> > not always possible for people who work on the English docs to locate the
> > right place in the translations where the updated contents should be
> > pasted.
> >
> > My two cents on the potential approach. We might label the translation
> with
> > the commit id of the original doc where they are in synchronization, and
> > automatically display a warning on the translation if an out-of-sync is
> > detected.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Jun 8, 2020 at 4:30 PM Dawid Wysakowicz 
> > wrote:
> >
> > > I don't have a well defined opinion on adding new language. However,
> one
> > > thing that I'd like to bring up to the attention on that occasion is it
> > > is already quite cumbersome to update two versions of the docs.
> > > Especially when we add new sections or change smaller parts of an
> > > existing document. Right now if I add three sections in an English
> > > version I have three additional places in the Chinese documents where I
> > > need to paste that. With additional language it doubles, making it 6
> > > places where I have to manually paste the added parts.
> > >
> > > I'd be way more welcoming for adding a new language if we had a better
> > > tooling/process for synchronizing changes across different languages.
> Or
> > > if we agree the translations are best effort documents and we do not
> > > update them when changing the English documents.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 08/06/2020 09:44, Robert Metzger wrote:
> > > > Hi all,
> > > >
> > > > we've received a pull request on flink-web.git for adding a Japanese
> > > > translation of the Flink website:
> > > > https://github.com/apache/flink-web/pull/346
> > > >
> > > > Before we accept this PR, I believe we should have a discussion about
> > it.
> > > >
> > > > *Relevance*: Looking at Google analytics, our users are coming from
> > China
> > > > (1.), US (2.), India (3.), Germany (4.), Japan (5.).
> > > > I'd say that is high enough to consider a Japanese translation.
> > > >
> > > > *Reviewing*: I'm not aware of any committer who speaks Japanese. How
> > > would
> > > > we review this?
> > > > The contributor is offering to open follow-up

Re: [Reminder] Prefer {% link %} tag in documentation

2020-06-09 Thread Jark Wu
+1 to use  {% link %}  tag and add check in CI.

Tips: if want to link a Chinese page, should write: [CLI]({% link ops/
cli.zh.md %})

Best,
Jark

On Wed, 10 Jun 2020 at 10:30, Yangze Guo  wrote:

> Thanks for that reminder, Seth!
>
> +1 to add a check during CI if possible.
>
> Best,
> Yangze Guo
>
> On Wed, Jun 10, 2020 at 3:04 AM Kostas Kloudas  wrote:
> >
> > Thanks for the heads up Seth!
> >
> > Kostas
> >
> > On Tue, Jun 9, 2020 at 7:27 PM Seth Wiesman  wrote:
> > >
> > > The tag is new to Jekyll 4.0 which we only recently updated to.
> > >
> > > There are a lot of existing tags that would need to be updated first :)
> > > I opened a ticket to track that work and then yes that would make
> sense.
> > >
> > > Seth
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18193
> > >
> > > On Tue, Jun 9, 2020 at 12:05 PM Robert Metzger 
> wrote:
> > >
> > > > Thanks for the reminder. I was also not aware of this tag!
> > > >
> > > > How about enforcing the use of this tag through CI?
> > > > We could for example grep through the added lines of all changes in
> docs/
> > > > and fail the build if we see the wrong pattern.
> > > >
> > > >
> > > > On Tue, Jun 9, 2020 at 4:37 PM Seth Wiesman 
> wrote:
> > > >
> > > > > Whoops, responded from the wrong email :)
> > > > >
> > > > > Thank you for noticing that, the guide is out of date. I will fix
> that
> > > > > immediately!
> > > > >
> > > > > On Tue, Jun 9, 2020 at 9:36 AM Seth Wiesman 
> wrote:
> > > > >
> > > > > > Thank you for noticing that, the guide is out of date. I will
> fix that
> > > > > > immediately!
> > > > > >
> > > > > > On Tue, Jun 9, 2020 at 9:34 AM Dawid Wysakowicz <
> > > > dwysakow...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Seth,
> > > > > >>
> > > > > >> Thanks I did not know that.
> > > > > >>
> > > > > >> I am not entirely sure, but I think our documentation guide is
> > > > slightly
> > > > > >> outdated on that manner (
> > > > > >> https://flink.apache.org/contributing/docs-style.html) Or is
> there a
> > > > > >> mistake in your example? Our guide recommends:
> > > > > >>
> > > > > >> [CLI]({{ site.baseurl }}{% link ops/cli.md %})
> > > > > >>
> > > > > >>
> > > > > >> Best,
> > > > > >>
> > > > > >> Dawid
> > > > > >> On 09/06/2020 16:20, Seth Wiesman wrote:
> > > > > >>
> > > > > >> Hi Everyone!
> > > > > >>
> > > > > >> As we are seeing an influx of documentation PRs in anticipation
> of the
> > > > > 1.11
> > > > > >> release I would like to remind everyone to use the {% link %}
> tag when
> > > > > >> cross-linking pages[1]. This is opposed to creating a link
> based on
> > > > > >> site.baseurl.
> > > > > >>
> > > > > >> Going forward a link such as:
> > > > > >>
> > > > > >> [CLI]({% site.baseurl %}/ops/cli.html)
> > > > > >>
> > > > > >> Should be written as:
> > > > > >>
> > > > > >> [CLI]({% link ops/cli.md %})
> > > > > >>
> > > > > >> This tag will fail the build on broken links which will help us
> > > > prevent
> > > > > >> 404s on the website.
> > > > > >>
> > > > > >> You can see a good example of the link tag in action here[2].
> > > > > >>
> > > > > >> Seth
> > > > > >>
> > > > > >> [1] https://jekyllrb.com/docs/liquid/tags/
> > > > > >> [2]
> > > > >
> > > >
> https://github.com/apache/flink/blame/b6ea96251d101ca25aa6a6b92170cfa4274b4cc3/docs/index.md#L65-L67
> > > > > >>
> > > > > >>
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Seth Wiesman | Solutions Architect
> > > > > >
> > > > > > +1 314 387 1463
> > > > > >
> > > > > > 
> > > > > >
> > > > > > Follow us @VervericaData
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Join Flink Forward  - The Apache
> Flink
> > > > > > Conference
> > > > > >
> > > > > > Stream Processing | Event Driven | Real Time
> > > > > >
> > > > >
> > > >
>


Re: request create flip permission for flink es bounded source/lookup source connector

2020-06-12 Thread Jark Wu
Hi Jacky,

What's your username in wiki? So that I can give the permission to you.

Best,
Jark

On Fri, 12 Jun 2020 at 11:38, Jacky Lau  wrote:

> hi all:
>After this simple discussion here
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discussion-flink-elasticsearch-connector-supports-td42082.html#a42106
> ,
>and i should create i flip127 to  track this. But i don't have create
> flip permision.
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: [DISCUSS] SQL Syntax for Table API StatementSet

2020-06-15 Thread Jark Wu
Hi Fabian,

Thanks for starting this discussion. I think this is a very important
syntax to support file mode and multi-statement for SQL Client.
I'm +1 to introduce a syntax to group SQL statements to execute together.

As a reference, traditional database systems also have similar syntax, such
as "START/BEGIN TRANSACTION ... COMMIT" to group statements as a
transaction [1],
and also "BEGIN ... END" [2] [3] to group a set of SQL statements that
execute together.

Maybe we can also use "BEGIN ... END" syntax which is much simpler?

Regarding where to implement, I also prefer to have it in Flink SQL core,
here are some reasons from my side:
1) I think many downstream projects (e.g Zeppelin) will have the same
requirement. It would be better to have it in core instead of reinventing
the wheel by users.
2) Having it in SQL CLI means it is a standard syntax to support statement
set in Flink. So I think it makes sense to have it in core too, otherwise,
it looks like a broken feature.
In 1.10, CREATE VIEW is only supported in SQL CLI, not supported in
TableEnvironment, which confuses many users.
3) Currently, we are moving statement parsing to use sql-parser
(FLINK-17728). Calcite has a good support for parsing multi-statements.
It will be tricky to parse multi-statements only in SQL Client.

Best,
Jark

[1]:
https://docs.microsoft.com/en-us/sql/t-sql/language-elements/begin-transaction-transact-sql?view=sql-server-ver15
[2]:
https://www.sqlservertutorial.net/sql-server-stored-procedures/sql-server-begin-end/
[3]: https://dev.mysql.com/doc/refman/8.0/en/begin-end.html

On Mon, 15 Jun 2020 at 20:50, Fabian Hueske  wrote:

> Hi everyone,
>
> FLIP-84 [1] added the concept of a "statement set" to group multiple INSERT
> INTO statements (SQL or Table API) together. The statements in a statement
> set are jointly optimized and executed as a single Flink job.
>
> I would like to start a discussion about a SQL syntax to group multiple
> INSERT INTO statements in a statement set. The use case would be to expose
> the statement set feature to a solely text based client for Flink SQL such
> as Flink's SQL CLI [1].
>
> During the discussion of FLIP-84, we had briefly talked about such a syntax
> [3].
>
> START STATEMENT SET;
> INSERT INTO ... SELECT ...;
> INSERT INTO ... SELECT ...;
> ...
> END STATEMENT SET;
>
> We didn't follow up on this proposal, to keep the focus on the FLIP-84
> Table API changes and to not dive into a discussion about multiline SQL
> query support [4].
>
> While this feature is clearly based on multiple SQL queries, I think it is
> a bit different from what we usually understand as multiline SQL support.
> That's because a statement set ends up to be a single Flink job. Hence,
> there is no need on the Flink side to coordinate the execution of multiple
> jobs (incl. the discussion about blocking or async execution of queries).
> Flink would treat the queries in a STATEMENT SET as a single query.
>
> I would like to start a discussion about supporting the [START|END]
> STATEMENT SET syntax (or a different syntax with equivalent semantics) in
> Flink.
> I don't have a strong preference whether this should be implemented in
> Flink's SQL core or be a purely client side implementation in the CLI
> client. It would be good though to have parser support in Flink for this.
>
> What do others think?
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
> [3]
>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#heading=h.al86t1h4ecuv
> [4]
>
> https://lists.apache.org/thread.html/rf494e227c47010c91583f90eeaf807d3a4c3eb59d105349afd5fdc31%40%3Cdev.flink.apache.org%3E
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Jark Wu
Congratulations Yu! Well deserved!

Best,
Jark

On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:

> Congratulations Yu!
>
> Best,
> Haibo
>
>
> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
> >Hi all,
> >
> >On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >part of the Apache Flink Project Management Committee (PMC).
> >
> >Yu Li has been very active on Flink's Statebackend component, working on
> >various improvements, for example the RocksDB memory management for 1.10.
> >and keeps checking and voting for our releases, and also has successfully
> >produced two releases(1.10.0&1.10.1) as RM.
> >
> >Congratulations & Welcome Yu Li!
> >
> >Best,
> >Jincheng (on behalf of the Flink PMC)
>
>


Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

2020-06-16 Thread Jark Wu
Hi,

Which Flink version are you using? Are you using SQL CLI? Could you share
your table/sql program?
We did fix some classloading problems around SQL CLI, e.g. FLINK-18302

Best,
Jark

On Wed, 17 Jun 2020 at 10:31, 杜斌  wrote:

> add the full stack trace here:
>
>
> Caused by:
>
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> ... 14 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ... 17 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 2, Column
> 46: Cannot determine simple type name "org"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> at
>
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
>
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
>
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 23 more
>
> 杜斌  于2020年6月17日周三 上午10:29写道:
>
> > Hi,
> > Need 

Re: [Reminder] Prefer {% link %} tag in documentation

2020-06-16 Thread Jark Wu
Hi everyone,

Before going to use {% link %} and adding a CI to force using link tag, I
suggest adding a CI profile to check broken links first.

The background is that, recently, I noticed that many contributors are
beginning to use link tags, but forget to link ".zh.md" instead of ".md" in
Chinese documentation.
This leads to the docs build failing in the last two days [1]. I have fixed
a couple of broken links. But if we don't have a CI mechanism, this would
make the docs build unstable.

Best,
Jark

[1]: https://ci.apache.org/builders/flink-docs-master

On Mon, 15 Jun 2020 at 12:48, Congxian Qiu  wrote:

> +1 to use {% link %} tag and add a check during CI.
> for Chinese doc, will suggest the Chinese translate contributor use the {%
> link %} tag when reviewing the translate pr.
>
> Best,
> Congxian
>
>
> Jark Wu  于2020年6月10日周三 上午10:48写道:
>
> > +1 to use  {% link %}  tag and add check in CI.
> >
> > Tips: if want to link a Chinese page, should write: [CLI]({% link ops/
> > cli.zh.md %})
> >
> > Best,
> > Jark
> >
> > On Wed, 10 Jun 2020 at 10:30, Yangze Guo  wrote:
> >
> > > Thanks for that reminder, Seth!
> > >
> > > +1 to add a check during CI if possible.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Jun 10, 2020 at 3:04 AM Kostas Kloudas 
> > wrote:
> > > >
> > > > Thanks for the heads up Seth!
> > > >
> > > > Kostas
> > > >
> > > > On Tue, Jun 9, 2020 at 7:27 PM Seth Wiesman 
> > wrote:
> > > > >
> > > > > The tag is new to Jekyll 4.0 which we only recently updated to.
> > > > >
> > > > > There are a lot of existing tags that would need to be updated
> first
> > :)
> > > > > I opened a ticket to track that work and then yes that would make
> > > sense.
> > > > >
> > > > > Seth
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-18193
> > > > >
> > > > > On Tue, Jun 9, 2020 at 12:05 PM Robert Metzger <
> rmetz...@apache.org>
> > > wrote:
> > > > >
> > > > > > Thanks for the reminder. I was also not aware of this tag!
> > > > > >
> > > > > > How about enforcing the use of this tag through CI?
> > > > > > We could for example grep through the added lines of all changes
> in
> > > docs/
> > > > > > and fail the build if we see the wrong pattern.
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 9, 2020 at 4:37 PM Seth Wiesman  >
> > > wrote:
> > > > > >
> > > > > > > Whoops, responded from the wrong email :)
> > > > > > >
> > > > > > > Thank you for noticing that, the guide is out of date. I will
> fix
> > > that
> > > > > > > immediately!
> > > > > > >
> > > > > > > On Tue, Jun 9, 2020 at 9:36 AM Seth Wiesman <
> s...@ververica.com>
> > > wrote:
> > > > > > >
> > > > > > > > Thank you for noticing that, the guide is out of date. I will
> > > fix that
> > > > > > > > immediately!
> > > > > > > >
> > > > > > > > On Tue, Jun 9, 2020 at 9:34 AM Dawid Wysakowicz <
> > > > > > dwysakow...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi Seth,
> > > > > > > >>
> > > > > > > >> Thanks I did not know that.
> > > > > > > >>
> > > > > > > >> I am not entirely sure, but I think our documentation guide
> is
> > > > > > slightly
> > > > > > > >> outdated on that manner (
> > > > > > > >> https://flink.apache.org/contributing/docs-style.html) Or
> is
> > > there a
> > > > > > > >> mistake in your example? Our guide recommends:
> > > > > > > >>
> > > > > > > >> [CLI]({{ site.baseurl }}{% link ops/cli.md %})
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >>
> > > > > > > >> Dawid
> > > > >

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

2020-06-16 Thread Jark Wu
Why compile JobGraph yourself? This is really an internal API and may cause
problems.
Could you try to use `flink run` command [1] to submit your user jar
instead?

Btw, what's your Flink version? If you are using Flink 1.10.0, could you
try to use 1.10.1?

Best,
Jark

On Wed, 17 Jun 2020 at 12:41, 杜斌  wrote:

> Thanks for the reply,
> Here is the simple java program that re-produce the problem:
> 1. code for the application:
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
>
> import java.util.Arrays;
>
> public class Test {
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> /**
>  * If enable checkpoint, blink planner will failed
>  */
> env.enableCheckpointing(1000);
>
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> //.useBlinkPlanner() // compile fail
> .useOldPlanner() // compile success
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, envSettings);
> DataStream orderA = env.fromCollection(Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "beer", 2)
> ));
>
> //Table table = tEnv.fromDataStream(orderA);
> tEnv.createTemporaryView("orderA", orderA);
> Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> DataStream> ds =
> tEnv.toRetractStream(res, Row.class);
> ds.print();
> env.execute();
>
> }
>
> public static class Order {
> public long user;
> public String product;
> public int amount;
>
> public Order(long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
> }
>
> public long getUser() {
> return user;
> }
>
> public void setUser(long user) {
> this.user = user;
> }
>
> public String getProduct() {
> return product;
> }
>
> public void setProduct(String product) {
> this.product = product;
> }
>
> public int getAmount() {
> return amount;
> }
>
> public void setAmount(int amount) {
> this.amount = amount;
> }
> }
> }
>
> 2. mvn clean package to a jar file
> 3. then we use the following code to produce a jobgraph:
>
> PackagedProgram program =
> PackagedProgram.newBuilder()
> .setJarFile(userJarPath)
> .setUserClassPaths(classpaths)
> .setEntryPointClassName(userMainClass)
> .setConfiguration(configuration)
>
> .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> descriptor.getSavepointPath() != null &&
> !descriptor.getSavepointPath().equals("")) ?
>
> SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> descriptor.isAllowNonRestoredState()) :
> SavepointRestoreSettings.none())
> .setArguments(userArgs)
> .build();
>
>
> JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> configuration, 4, true);
>
> 4. If we use blink planner & enable checkpoint, the compile will failed.
> For the others, the compile success.
>
> Thanks,
> Bin
>
> Jark Wu  于2020年6月17日周三 上午10:42写道:
>
> > Hi,
> >
> > Which Flink version are you using? Are you using SQL CLI? Could you share
> > your table/sql program?
> > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
> >
> > Best,
> > Jark
> >
> > On Wed, 17 Jun 2020 at 10:31, 杜斌  wrote:
> >
> > > add the full stack trace here:
> > >
> > >
> > > Caused by:
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > org.apache.flink.api.common.InvalidProgramException: Table progra

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

2020-06-17 Thread Jark Wu
Glad to hear that!

On Thu, 18 Jun 2020 at 09:12, 杜斌  wrote:

> Sure, it helps a lot. After update to Flink 1.10.1, problem solved!
>
> Thanks,
> Bin
>
> Fabian Hueske  于2020年6月17日周三 下午4:32写道:
>
> > The exception is thrown when the StreamGraph is translated to a JobGraph.
> > The translation logic has a switch. If checkpointing is enabled, the Java
> > code generated by the optimizer is directly compiled in the client (in
> > contrast to compiling it on the TaskManagers when the operators are
> > started).
> > The compiler needs access to the UDF classes but the classloader that's
> > being used doesn't know about the UDF classes.
> >
> > The classloader that's used for the compilation is generated by
> > PackagedProgram.
> > You can configure the PackagedProgram with the right user code JAR URLs
> > which contain the UDF classes.
> > Alternatively, you can try to inject another classloader into
> > PackagedProgram using Reflection (but that's a rather hacky approach).
> >
> > Hope this helps.
> >
> > Cheers, Fabian
> >
> > Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu :
> >
> > > Why compile JobGraph yourself? This is really an internal API and may
> > cause
> > > problems.
> > > Could you try to use `flink run` command [1] to submit your user jar
> > > instead?
> > >
> > > Btw, what's your Flink version? If you are using Flink 1.10.0, could
> you
> > > try to use 1.10.1?
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 17 Jun 2020 at 12:41, 杜斌  wrote:
> > >
> > > > Thanks for the reply,
> > > > Here is the simple java program that re-produce the problem:
> > > > 1. code for the application:
> > > >
> > > > import org.apache.flink.api.java.tuple.Tuple2;
> > > > import org.apache.flink.streaming.api.datastream.DataStream;
> > > > import
> > > >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > > > import org.apache.flink.table.api.EnvironmentSettings;
> > > > import org.apache.flink.table.api.Table;
> > > > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > > > import org.apache.flink.types.Row;
> > > >
> > > >
> > > > import java.util.Arrays;
> > > >
> > > > public class Test {
> > > > public static void main(String[] args) throws Exception {
> > > >
> > > > StreamExecutionEnvironment env =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > /**
> > > >  * If enable checkpoint, blink planner will failed
> > > >  */
> > > > env.enableCheckpointing(1000);
> > > >
> > > > EnvironmentSettings envSettings =
> > > EnvironmentSettings.newInstance()
> > > > //.useBlinkPlanner() // compile fail
> > > > .useOldPlanner() // compile success
> > > > .inStreamingMode()
> > > > .build();
> > > > StreamTableEnvironment tEnv =
> > > > StreamTableEnvironment.create(env, envSettings);
> > > > DataStream orderA = env.fromCollection(Arrays.asList(
> > > > new Order(1L, "beer", 3),
> > > > new Order(1L, "diaper", 4),
> > > > new Order(3L, "beer", 2)
> > > > ));
> > > >
> > > > //Table table = tEnv.fromDataStream(orderA);
> > > > tEnv.createTemporaryView("orderA", orderA);
> > > > Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> > > > DataStream> ds =
> > > > tEnv.toRetractStream(res, Row.class);
> > > > ds.print();
> > > > env.execute();
> > > >
> > > > }
> > > >
> > > > public static class Order {
> > > > public long user;
> > > > public String product;
> > > > public int amount;
> > > >
> > > > public Order(long user, String product, int amount) {
> > > > this.user = user;
> > > > this.product = product;
> > > > this.amount = amount;
> > > > }
> > > >
> > > >   

<    2   3   4   5   6   7   8   9   10   11   >