[jira] [Created] (FLINK-17752) Align the timestamp format with Flink SQL types in JSON format

2020-05-16 Thread Jark Wu (Jira)
Jark Wu created FLINK-17752:
---

 Summary: Align the timestamp format with Flink SQL types in JSON 
format
 Key: FLINK-17752
 URL: https://issues.apache.org/jira/browse/FLINK-17752
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu
 Fix For: 1.11.0


Currently, we are using RFC3339_TIMESTAMP_FORMAT (which will add timezone at 
the end of string) to as the timestamp format in JSON. However, the string 
representation fo {{TIMESTAMP (WITHOUT TIME ZONE)}} shoudn't adding 'Z' at the 
end. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17693) Add createTypeInformation to DynamicTableSink#Context

2020-05-14 Thread Jark Wu (Jira)
Jark Wu created FLINK-17693:
---

 Summary: Add createTypeInformation to DynamicTableSink#Context
 Key: FLINK-17693
 URL: https://issues.apache.org/jira/browse/FLINK-17693
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Currently, we have {{createTypeInformation}} on {{DynamicTableSource#Context}}, 
but not on {{DynamicTableSink#Context}}.
In some sink connectors, we need to buffer the {{RowData}} and flush at some 
interval. So we need to copy the {{RowData}} if object reuse is enabled, then 
the internal TypeInformation is needed to get the RowData TypeSerializer#copy. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17689) Add integeration tests for Debezium and Canal formats

2020-05-14 Thread Jark Wu (Jira)
Jark Wu created FLINK-17689:
---

 Summary: Add integeration tests for Debezium and Canal formats
 Key: FLINK-17689
 URL: https://issues.apache.org/jira/browse/FLINK-17689
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Currently, we don't add IT cases for debezium and canal formats, because 
there's no connectors support the new formats yet. We will wait FLINK-17026 and 
add IT cases in Kafka connector module. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17647) Improve new connector options exception in old planner

2020-05-12 Thread Jark Wu (Jira)
Jark Wu created FLINK-17647:
---

 Summary: Improve new connector options exception in old planner
 Key: FLINK-17647
 URL: https://issues.apache.org/jira/browse/FLINK-17647
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Legacy Planner
Reporter: Jark Wu


Currently, if users use new factory in old planner, the exception is 
misleading. We should improve the excecption in old planner to tell users 
"maybe should use blink planner". 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17633) Improve FactoryUtil to align with new format options keys

2020-05-12 Thread Jark Wu (Jira)
Jark Wu created FLINK-17633:
---

 Summary: Improve FactoryUtil to align with new format options keys
 Key: FLINK-17633
 URL: https://issues.apache.org/jira/browse/FLINK-17633
 Project: Flink
  Issue Type: Sub-task
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


As discussed in 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Hierarchies-in-ConfigOption-td40920.html,
 we propose to use the following new format options: 

{code:java}
format = json
json.fail-on-missing-field = true
json.ignore-parse-error = true

value.format = json
value.json.fail-on-missing-field = true
value.json.ignore-parse-error = true
{code}

However, the current {{discoverScan/SinkFormat}} of {{FactoryUtil}} uses 
{{value.format.fail-on-missing-field}} as the key, which is not handy for 
connectors to use. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17630) Implement format factory for Avro serialization and deserialization schema

2020-05-12 Thread Jark Wu (Jira)
Jark Wu created FLINK-17630:
---

 Summary: Implement format factory for Avro serialization and 
deserialization schema
 Key: FLINK-17630
 URL: https://issues.apache.org/jira/browse/FLINK-17630
 Project: Flink
  Issue Type: Sub-task
Reporter: Jark Wu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17629) Implement format factory for JSON serialization and deserialization schema

2020-05-12 Thread Jark Wu (Jira)
Jark Wu created FLINK-17629:
---

 Summary: Implement format factory for JSON serialization and 
deserialization schema
 Key: FLINK-17629
 URL: https://issues.apache.org/jira/browse/FLINK-17629
 Project: Flink
  Issue Type: Sub-task
Reporter: Jark Wu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17625) Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

2020-05-11 Thread Jark Wu (Jira)
Jark Wu created FLINK-17625:
---

 Summary: Fix ArrayIndexOutOfBoundsException in 
AppendOnlyTopNFunction
 Key: FLINK-17625
 URL: https://issues.apache.org/jira/browse/FLINK-17625
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jark Wu
 Fix For: 1.11.0, 1.10.2


This is reported in user mailing list: 
http://apache-flink.147419.n8.nabble.com/sql-topN-ArrayIndexOutOfBoundsException-td3008.html

We should check list is not empty before removing the last element in 
{{AppendOnlyTopNFunction#processElementWithoutRowNumber}}.

{code:java}
ava.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.remove(ArrayList.java:499)
at 
org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElementWithoutRowNumber(AppendOnlyTopNFunction.java:205)
at 
org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:120)
at 
org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction.processElement(AppendOnlyTopNFunction.java:46)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

2020-05-11 Thread Jark Wu
Thanks for the explanation. I like the fatory pattern to make the member
variables immutable and final.

So +1 to the proposal.

Best,
Jark

On Mon, 11 May 2020 at 22:01, Stephan Ewen  wrote:

> I am fine with that.
>
> Much of the principles seem agreed upon. I understand the need to support
> code-generated extractors and we should support most of it already (as
> Aljoscha mentioned via the factories) can extend this if needed.
>
> I think that the factory approach supports code-generated extractors in a
> cleaner way even than an extractor with an open/init method.
>
>
> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek 
> wrote:
>
> > We're slightly running out of time. I would propose we vote on the basic
> > principle and remain open to later additions. This feature is quite
> > important to make the new Kafka Source that is developed as part of
> > FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> > the newly added connector.
> >
> > I know that's a bit unorthodox but would everyone be OK with what's
> > currently there and then we iterate?
> >
> > Best,
> > Aljoscha
> >
> > On 11.05.20 13:57, Aljoscha Krettek wrote:
> > > Ah, I meant to write this in my previous email, sorry about that.
> > >
> > > The WatermarkStrategy, which is basically a factory for a
> > > WatermarkGenerator is the replacement for the open() method. This is
> the
> > > same strategy that was followed for StreamOperatorFactory, which was
> > > introduced to allow code generation in the Table API [1]. If we need
> > > metrics or other things we would add that as a parameter to the factory
> > > method. What do you think?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11974
> > >
> > > On 10.05.20 05:07, Jark Wu wrote:
> > >> Hi,
> > >>
> > >> Regarding to the `open()/close()`, I think it's necessary for
> > >> Table to
> > >> compile the generated code.
> > >> In Table, the watermark strategy and event-timestamp is defined
> > using
> > >> SQL expressions, we will
> > >> translate and generate Java code for the expressions. If we have
> > >> `open()/close()`, we don't need lazy initialization.
> > >> Besides that, I can see a need to report some metrics, e.g. the
> current
> > >> watermark, the dirty timestamps (null value), etc.
> > >> So I think a simple `open()/close()` with a context which can get
> > >> MetricGroup is nice and not complex for the first version.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >>
> > >> On Sun, 10 May 2020 at 00:50, Stephan Ewen  wrote:
> > >>
> > >>> Thanks, Aljoscha, for picking this up.
> > >>>
> > >>> I agree with the approach of doing the here proposed set of changes
> for
> > >>> now. It already makes things simpler and adds idleness support
> > >>> everywhere.
> > >>>
> > >>> Rich functions and state always add complexity, let's do this in a
> next
> > >>> step, if we have a really compelling case.
> > >>>
> > >>>
> > >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> aljos...@apache.org>
> > >>> wrote:
> > >>>
> > >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
> > is
> > >>>> basically to turn emitting into a "flatMap", we give the
> > >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> can
> > >>>> decide whether to output a watermark or not and can also mark the
> > >>>> output
> > >>>> as idle. Changing the interface to return a Watermark (as the
> previous
> > >>>> watermark assigner interface did) would not allow that flexibility.
> > >>>>
> > >>>> Regarding checkpointing the watermark and keeping track of the
> minimum
> > >>>> watermark, this would be the responsibility of the framework (or the
> > >>>> KafkaConsumer in the current implementation). The user-supplied WG
> > does
> > >>>> not need to make sure the watermark doesn't regress.
> > >>>>
> > >>>> Regarding making the WG a "rich function", I can see the potential
> > >>&

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

2020-05-09 Thread Jark Wu
Hi,

Regarding to the `open()/close()`, I think it's necessary for Table to
compile the generated code.
In Table, the watermark strategy and event-timestamp is defined using
SQL expressions, we will
translate and generate Java code for the expressions. If we have
`open()/close()`, we don't need lazy initialization.
Besides that, I can see a need to report some metrics, e.g. the current
watermark, the dirty timestamps (null value), etc.
So I think a simple `open()/close()` with a context which can get
MetricGroup is nice and not complex for the first version.

Best,
Jark



On Sun, 10 May 2020 at 00:50, Stephan Ewen  wrote:

> Thanks, Aljoscha, for picking this up.
>
> I agree with the approach of doing the here proposed set of changes for
> now. It already makes things simpler and adds idleness support everywhere.
>
> Rich functions and state always add complexity, let's do this in a next
> step, if we have a really compelling case.
>
>
> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek 
> wrote:
>
> > Regarding the WatermarkGenerator (WG) interface itself. The proposal is
> > basically to turn emitting into a "flatMap", we give the
> > WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> > decide whether to output a watermark or not and can also mark the output
> > as idle. Changing the interface to return a Watermark (as the previous
> > watermark assigner interface did) would not allow that flexibility.
> >
> > Regarding checkpointing the watermark and keeping track of the minimum
> > watermark, this would be the responsibility of the framework (or the
> > KafkaConsumer in the current implementation). The user-supplied WG does
> > not need to make sure the watermark doesn't regress.
> >
> > Regarding making the WG a "rich function", I can see the potential
> > benefit but I also see a lot of pitfalls. For example, how should the
> > watermark state be handled in the case of scale-in? It could be made to
> > work in the Kafka case by attaching the state to the partition state
> > that we keep, but then we have potential backwards compatibility
> > problems also for the WM state. Does the WG usually need to keep the
> > state or might it be enough if the state is transient, i.e. if you have
> > a restart the WG would loose its histogram but it would rebuild it
> > quickly and you would get back to the same steady state as before.
> >
> > Best,
> > Aljoscha
> >
> > On 27.04.20 12:12, David Anderson wrote:
> > > Overall I like this proposal; thanks for bringing it forward, Aljoscha.
> > >
> > > I also like the idea of making the Watermark generator a rich function
> --
> > > this should make it more straightforward to implement smarter watermark
> > > generators. Eg, one that uses state to keep statistics about the actual
> > > out-of-orderness, and uses those statistics to implement a variable
> > delay.
> > >
> > > David
> > >
> > > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas 
> > wrote:
> > >
> > >> Hi Aljoscha,
> > >>
> > >> Thanks for opening the discussion!
> > >>
> > >> I have two comments on the FLIP:
> > >> 1) we could add lifecycle methods to the Generator, i.e. open()/
> > >> close(), probably with a Context as argument: I have not fully thought
> > >> this through but I think that this is more aligned with the rest of
> > >> our rich functions. In addition, it will allow, for example, to
> > >> initialize the Watermark value, if we decide to checkpoint the
> > >> watermark (see [1]) (I also do not know if Table/SQL needs to do
> > >> anything in the open()).
> > >> 2) aligned with the above, and with the case where we want to
> > >> checkpoint the watermark in mind, I am wondering about how we could
> > >> implement this in the future. In the FLIP, it is proposed to expose
> > >> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> > >> that there is the implicit contract that watermarks are
> > >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> > >> assume) a check that will compare the last emitted WM against the
> > >> provided one, and emit it only if it is >=. If not, then we risk
> > >> having the user shooting himself on the foot if he/she accidentally
> > >> forgets the check. Given that the WatermarkGenerator and its caller do
> > >> not know if the watermark was finally emitted or not (the
> > >> WatermarkOutput#emitWatermark returns void), who will be responsible
> > >> for checkpointing the WM?
> > >>
> > >> Given this, why not having the methods as:
> > >>
> > >> public interface WatermarkGenerator {
> > >>
> > >>  Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> > >> output);
> > >>
> > >>  Watermark onPeriodicEmit(WatermarkOutput output);
> > >> }
> > >>
> > >> and the caller will be the one enforcing any invariants, such as
> > >> non-decreasing watermarks. In this way, the caller can checkpoint
> > >> anything that is needed as it will have complete knowledge as to if
> > >> the WM was emitted 

Re: [DISCUSS] Align the behavior of internal return result of MapState#entries, keys, values and iterator.

2020-05-09 Thread Jark Wu
+1 to return emty iterator and align the implementations.

Best,
Jark

On Sat, 9 May 2020 at 19:18, SteNicholas  wrote:

> Hi Tang Yun,
>  I agree with the point you mentioned that align these internal
> behavior
> to return empty iterator instead of null. In my opinion,
> StateMapViewWithKeysNullable handle nullable map keys, and result of
> internal map state should be empty map in the null behavior case.
> Therefore,
> as you mentioned, #iterator() should better return empty iterator.
>
> Thanks,
> Nicholas Jiang
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


[jira] [Created] (FLINK-17591) TableEnvironmentITCase.testExecuteSqlAndToDataStream failed

2020-05-09 Thread Jark Wu (Jira)
Jark Wu created FLINK-17591:
---

 Summary: TableEnvironmentITCase.testExecuteSqlAndToDataStream 
failed
 Key: FLINK-17591
 URL: https://issues.apache.org/jira/browse/FLINK-17591
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Reporter: Jark Wu


Here is the instance: 
https://dev.azure.com/imjark/Flink/_build/results?buildId=61=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=6855dd6e-a7b0-5fd1-158e-29fc186b16c8


{code:java}

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Failures: 
[ERROR]   TableEnvironmentITCase.testExecuteSqlAndToDataStream:343
[INFO] 
[ERROR] Tests run: 791, Failures: 1, Errors: 0, Skipped: 13

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-05-07 Thread Jark Wu
Hi,

I agree what Fabian said above.
Besides, IMO, (3) is in a lower priority and will involve much more things.
It makes sense to me to do it in two-phase.

Regarding to (3), the key point to convert an append-only table into
changelog table is that the framework should know the operation type,
so we introduced a special CREATE VIEW syntax to do it in the documentation
[1]. Here is an example:

-- my_binlog table is registered as an append-only table
CREATE TABLE my_binlog (
  before ROW<...>,
  after ROW<...>,
  op STRING,
  op_ms TIMESTAMP(3)
) WITH (
  'connector.type' = 'kafka',
  ...
);

-- interpret my_binlog as a changelog on the op_type and id key
CREATE VIEW my_table AS
  SELECT
after.*
  FROM my_binlog
  CHANGELOG OPERATION BY op
  UPDATE KEY BY (id);

-- my_table will materialize the insert/delete/update changes
-- if we have 4 records in dbz that
-- a create for 1004
-- an update for 1004
-- a create for 1005
-- a delete for 1004
> SELECT COUNT(*) FROM my_table;
+---+
|  COUNT(*) |
+---+
| 1 |
+---+

Best,
Jark

[1]:
https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb


On Fri, 8 May 2020 at 00:24, Fabian Hueske  wrote:

> Thanks for the summary Konstantin.
> I think you got all points right.
>
> IMO, the way forward would be to work on a FLIP to define
> * the concept of temporal tables,
> * how to feed them from retraction tables
> * how to feed them from append-only tables
> * their specification with CREATE TEMPORAL TABLE,
> * how to use temporal tables in temporal table joins
> * how (if at all) to use temporal tables in other types of queries
>
> We would keep the LATERAL TABLE syntax because it used for regular
> table-valued functions.
> However, we would probably remove the TemporalTableFunction (which is a
> built-in table-valued function) after we deprecated it for a while.
>
> Cheers, Fabian
>
> Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf <
> kna...@apache.org>:
>
>> Hi everyone,
>>
>> Thanks everyone for joining the discussion on this. Please let me
>> summarize
>> what I have understood so far.
>>
>> 1) For joining an append-only table and a temporal table the syntax the
>> "FOR
>> SYSTEM_TIME AS OF " seems to be preferred (Fabian, Timo,
>> Seth).
>>
>> 2) To define a temporal table based on a changelog stream from an external
>> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be used.
>> 3) In order to also support temporal tables derived from an append-only
>> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian)
>> or
>> need to have a way to convert an append-only table into a changelog table
>> (briefly discussed in [1]). It is not completely clear to me how a
>> temporal
>> table based on an append-only table would be with the syntax proposed in
>> [1] and 2). @Jark Wu  could you elaborate a bit on
>> that?
>>
>> How do we move forward with this?
>>
>> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense.
>> What do you think? * If we proceed like this, what would this mean for the
>> current syntax of LATERAL TABLE? Would we keep it? Would we eventually
>> deprecate and drop it? Since only after 3) we would be on par with the
>> current temporal table function join, I assume, we could only drop it
>> thereafter.
>>
>> Thanks, Konstantin
>>
>> [1]
>>
>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6
>>
>>
>> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu  wrote:
>>
>> > Hi Fabian,
>> >
>> > Just to clarify a little bit, we decided to move the "converting
>> > append-only table into changelog table" into future work.
>> > So FLIP-105 only introduced some CDC formats (debezium) and new
>> TableSource
>> > interfaces proposed in FLIP-95.
>> > I should have started a new FLIP for the new CDC formats and keep
>> FLIP-105
>> > as it is to avoid the confusion, sorry about that.
>> >
>> > Best,
>> > Jark
>> >
>> >
>> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske  wrote:
>> >
>> > > Thanks Jark!
>> > >
>> > > I certainly need to read up on FLIP-105 (and I'll try to adjust my
>> > > terminology to changelog table from now on ;-) )
>> > > If FLIP-105 addresses the issue of converting an append-only table
>> into a
>> > > changelog table that upserts on primary key (basically what the VIEW
>> > &g

Re: set a retract switch

2020-05-06 Thread Jark Wu
Hi Lec,

You can use `StreamTableEnvironment#toRetractStream(table, Row.class)` to
get a `DataStream>`.
The true Boolean flag indicates an add message, a false flag indicates a
retract (delete) message. So you can just simply apply
 a flatmap function after this to ignore the false messages. Then you can
get a pure UPSERT stream.

Btw, such question should be posted in u...@flink.apache.org, not the dev
mailing list.

Best,
Jark

On Thu, 7 May 2020 at 10:07, lec ssmi  wrote:

> Hi:
>  During the execution of flink, especially the sql API, many operations
> in DataStream are not available. In many cases, we don't care about the
> DELETE record when retracting. Is it possible to set a switch so that the
> DELETE record when retracting is not processed? In other words, the
> downstream only receives a value after UPDATE, and does not need to receive
> the value before UPDATE. In some programming modes, processing DELETE
> records actually makes the logic more complicated.
>
> Best
> Lec Ssmi
>


Re: [DISCUSS] Hierarchies in ConfigOption

2020-05-06 Thread Jark Wu
Thanks all for the discussion, I have updated FLIP-105 and FLIP-122 to use
the new format option keys.

FLIP-105:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289
FLIP-122:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

Best,
Jark

On Wed, 6 May 2020 at 20:37, Timo Walther  wrote:

> Cool, so let's go with:
>
> format = json
> json.fail-on-missing-field = true
> json.ignore-parse-error = true
>
> value.format = json
> value.json.fail-on-missing-field = true
> value.json.ignore-parse-error = true
>
> Regards,
> Timo
>
>
> On 06.05.20 12:39, Jingsong Li wrote:
> > Hi,
> >
> > +1 to:
> > format = parquet
> > parquet.compression = ...
> > parquet.block.size = ...
> > parquet.page.size = ...
> >
> > For the formats like parquet and orc,
> > Not just Flink itself, but this way also let Flink keys compatible with
> the
> > property keys of Hadoop / Hive / Spark.
> >
> > And like Jark said, this way works for Kafka key value too.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 6, 2020 at 6:19 PM Jark Wu  wrote:
> >
> >> Hi,
> >>
> >> I think Timo proposed a good idea to make both side happy. That is:
> >>
> >> format = json
> >> json.fail-on-missing-field = true
> >> json.ignore-parse-error = true
> >>
> >> value.format = json
> >> value.json.fail-on-missing-field = true
> >> value.json.ignore-parse-error = true
> >>
> >> This is a valid hierarchies. Besides, it's more clear that the option
> >> belongs to a specific component (i.e. json).
> >> This will be more readable when we introducing more formats, e.g.
> parquet.
> >>
> >> format = parquet
> >> parquet.compression = ...
> >> parquet.block.size = ...
> >> parquet.page.size = ...
> >>
> >> is more readable than current style:
> >>
> >> format = parquet
> >> format.compression = ...
> >> format.block.size = ...
> >> format.page.size = ...
> >>
> >> To sum up, I'm +1 to use "format = json",  "json.fail-on-missing-field =
> >> true".
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 6 May 2020 at 17:12, Danny Chan  wrote:
> >>
> >>> Hi, everyone ~
> >>>
> >>> Allows me to share some thoughts here.
> >>>
> >>> Personally i think for SQL, "format" is obviously better than "
> >> format.name",
> >>> it is more concise and straight-forward, similar with Presto FORMAT[2]
> >> and
> >>> KSQL VALUE_FORMAT[1]; i think we move from "connector.type" to
> >> "connector"
> >>> for the same reason, the "type" or "name" suffix is implicit, SQL
> syntax
> >>> like the DDL is a top-level user API, so from my side keeping good
> >>> user-friendly syntax is more important.
> >>>
> >>> @Timo I'm big +1 for the a good code style guide, but that does not
> mean
> >>> we should go for a json-style table options in the DDL, the DDL could
> >> have
> >>> its own contract. Can we move "represent these config options in YAML"
> to
> >>> another topic ? Otherwise, how should we handle the "connector" key,
> >> should
> >>> we prefix all the table options with "connector" ? The original
> inention
> >> of
> >>> FLIP-122 is to remove some redundant prefix/suffix of the table options
> >>> because they are obviously implicit there, and the "connector." prefix
> >> and
> >>> the ".type" or ".name" suffix are the ones we most want to delete.
> >>>
> >>> @Dawid Although ".type" is just another 4 characters, but we force the
> >> SQL
> >>> users to do the thing that is obvious reduadant, i know serialize
> catalog
> >>> table to YAML or use the options in DataStream has similar keys
> request,
> >>> but they are different use cases that i believe many SQL user would not
> >>> encounter, that means we force many users to obey rules for cases they
> >>> would never have.
> >>>
> >>>
> >>> [1] https://docs.ksqldb.io/en/latest/developer-guide/create-a-table/
> >>> [2] https://prestodb.io/docs/current/sql/create-table.html
> >>>
> 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-05-06 Thread Jark Wu
Hi,

I think Timo proposed a good idea to make both side happy. That is:

format = json
json.fail-on-missing-field = true
json.ignore-parse-error = true

value.format = json
value.json.fail-on-missing-field = true
value.json.ignore-parse-error = true

This is a valid hierarchies. Besides, it's more clear that the option
belongs to a specific component (i.e. json).
This will be more readable when we introducing more formats, e.g. parquet.

format = parquet
parquet.compression = ...
parquet.block.size = ...
parquet.page.size = ...

is more readable than current style:

format = parquet
format.compression = ...
format.block.size = ...
format.page.size = ...

To sum up, I'm +1 to use "format = json",  "json.fail-on-missing-field =
true".

Best,
Jark

On Wed, 6 May 2020 at 17:12, Danny Chan  wrote:

> Hi, everyone ~
>
> Allows me to share some thoughts here.
>
> Personally i think for SQL, "format" is obviously better than "format.name",
> it is more concise and straight-forward, similar with Presto FORMAT[2] and
> KSQL VALUE_FORMAT[1]; i think we move from "connector.type" to "connector"
> for the same reason, the "type" or "name" suffix is implicit, SQL syntax
> like the DDL is a top-level user API, so from my side keeping good
> user-friendly syntax is more important.
>
> @Timo I'm big +1 for the a good code style guide, but that does not mean
> we should go for a json-style table options in the DDL, the DDL could have
> its own contract. Can we move "represent these config options in YAML" to
> another topic ? Otherwise, how should we handle the "connector" key, should
> we prefix all the table options with "connector" ? The original inention of
> FLIP-122 is to remove some redundant prefix/suffix of the table options
> because they are obviously implicit there, and the "connector." prefix and
> the ".type" or ".name" suffix are the ones we most want to delete.
>
> @Dawid Although ".type" is just another 4 characters, but we force the SQL
> users to do the thing that is obvious reduadant, i know serialize catalog
> table to YAML or use the options in DataStream has similar keys request,
> but they are different use cases that i believe many SQL user would not
> encounter, that means we force many users to obey rules for cases they
> would never have.
>
>
> [1] https://docs.ksqldb.io/en/latest/developer-guide/create-a-table/
> [2] https://prestodb.io/docs/current/sql/create-table.html
>
> Best,
> Danny Chan
> 在 2020年5月4日 +0800 PM11:34,Till Rohrmann ,写道:
> > Hi everyone,
> >
> > I like Timo's proposal to organize our configuration more hierarchical
> > since this is what the coding guide specifies. The benefit I see is that
> > config options belonging to the same concept will be found in the same
> > nested object. Moreover, it will be possible to split the configuration
> > into unrelated parts which are fed to the respective components. That way
> > one has a much better separation of concern since component A cannot read
> > the configuration of component B.
> >
> > Concerning Timo's last two proposals:
> >
> > If fail-on-missing-field is a common configuration shared by all formats,
> > then I would go with the first option:
> >
> > format.kind: json
> > format.fail-on-missing-field: true
> >
> > If fail-on-missing-field is specific for json, then one could go with
> >
> > format: json
> > json.fail-on-missing-field: true
> >
> > or
> >
> > format.kind: json
> > format.json.fail-on-missing-field: true
> >
> > Cheers,
> > Till
> >
> >
> > On Fri, May 1, 2020 at 11:55 AM Timo Walther  wrote:
> >
> > > Hi Jark,
> > >
> > > yes, in theory every connector can design options as they like. But for
> > > user experience and good coding style we should be consistent in Flink
> > > connectors and configuration. Because implementers of new connectors
> > > will copy the design of existing ones.
> > >
> > > Furthermore, I could image that people in the DataStream API would also
> > > like to configure their connector based on options in the near future.
> > > It might be the case that Flink DataStream API connectors will reuse
> the
> > > ConfigOptions from Table API for consistency.
> > >
> > > I'm favoring either:
> > >
> > > format.kind = json
> > > format.fail-on-missing-field: true
> > >
> > > Or:
> > >
> > > format = json
> > > json.fail-on-missing-field: true
>

[jira] [Created] (FLINK-17528) Use getters instead of RowData#get() utility in JsonRowDataSerializationSchema

2020-05-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-17528:
---

 Summary: Use getters instead of RowData#get() utility in 
JsonRowDataSerializationSchema
 Key: FLINK-17528
 URL: https://issues.apache.org/jira/browse/FLINK-17528
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu


Currently, we are using utility {{RowData#get(RowData, int, LogicalType)}} and 
{{ArrayData#get(ArrayData, int, LogicalType)}} to get the field/element 
objects. However, this is not as efficient as getters of RowData/ArrayData, 
because this utility involves boxing and unboxing and a big switch condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17526) Support AVRO serialization and deseriazation schema for RowData type

2020-05-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-17526:
---

 Summary: Support AVRO serialization and deseriazation schema for 
RowData type
 Key: FLINK-17526
 URL: https://issues.apache.org/jira/browse/FLINK-17526
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu


Add support AvroRowDataDeserializationSchema and AvroRowDataSerializationSchema 
for the new data structure RowData.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17525) Support to parse millisecond and nanoseconds for TIME type in CSV and JSON format

2020-05-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-17525:
---

 Summary: Support to parse millisecond and nanoseconds for TIME 
type in CSV and JSON format
 Key: FLINK-17525
 URL: https://issues.apache.org/jira/browse/FLINK-17525
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu


Currently, CSV and JSON format parse time string but ignores the millisecond 
and nanoseconds part. We should support that, because blink planner already 
support higher precision for TIME. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

2020-05-04 Thread Jark Wu
Hi Andrey,

Thanks for the tuning ideas. I will explain the design of deduplication.

The mini-batch implementation of deduplication buffers a bundle of input
data in heap (Java Map),
when the bundle size hit the trigger size or trigger time, the buffered
data will be processed together.
So we only need to access the state once per key. This is designed for
rocksdb statebackend to reduce the
frequently accessing, (de)serialization. And yes, this may slow down the
checkpoint, but the suggested
mini-batch timeout is <= 10s. From our production experience, it doesn't
have much impact on checkpoint.

Best,
Jark

On Tue, 5 May 2020 at 06:48, Andrey Zagrebin  wrote:

> Hi lsyldliu,
>
> You can try to tune the StateTtlConfig. As the documentation suggests [1]
> the TTL incremental cleanup can decrease the per record performance. This
> is the price of the automatic cleanup.
> If the only thing, which happens mostly in your operator, is working with
> state then even checking one additional record to cleanup is two times more
> actions to do.
> Timer approach was discussed in TTL feature design. It needs an additional
> implementation and keeps more state but performs only one cleanup action
> exactly when needed so it is a performance/storage trade-off.
>
> Anyways, 20x degradation looks indeed a lot.
> As a first step, I would suggest to configure the incremental cleanup
> explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to
> check, e.g. 1 because processFirstRow/processLastRow already access the
> state twice and do cleanup:
>
> .cleanupIncrementally(1, false)
>
>
> Also not sure but depending on the input data, finishBundle can happen
> mostly during the snapshotting which slows down taking the checkpoint.
> Could this fail the checkpoint accumulating the backpressure and slowing
> down the pipeline?
>
> Not sure why to keep the deduplication data in a Java map and in Flink
> state at the same time, why not to keep it only in Flink state and
> deduplicate on each incoming record?
>
> Best,
> Andrey
>
> [1] note 2 in
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup
>
> On Wed, Apr 29, 2020 at 11:53 AM 刘大龙  wrote:
>
> >
> >
> >
> > > -原始邮件-
> > > 发件人: "Jark Wu" 
> > > 发送时间: 2020-04-29 14:09:44 (星期三)
> > > 收件人: dev , "Yu Li" ,
> > myas...@live.com
> > > 抄送: azagre...@apache.org
> > > 主题: Re: The use of state ttl incremental cleanup strategy in sql
> > deduplication resulting in significant performance degradation
> > >
> > > Hi lsyldliu,
> > >
> > > Thanks for investigating this.
> > >
> > > First of all, if you are using mini-batch deduplication, it doesn't
> > support
> > > state ttl in 1.9. That's why the tps looks the same with 1.11 disable
> > state
> > > ttl.
> > > We just introduce state ttl for mini-batch deduplication recently.
> > >
> > > Regarding to the performance regression, it looks very surprise to me.
> > The
> > > performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
> > > I don't have much experience of the underlying of StateTtlConfig. So I
> > loop
> > > in @Yu Li  @YunTang in CC who may have more insights
> > on
> > > this.
> > >
> > > For more information, we use the following StateTtlConfig [1] in blink
> > > planner:
> > >
> > > StateTtlConfig
> > >   .newBuilder(Time.milliseconds(retentionTime))
> > >   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > >
>  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > >   .build();
> > >
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > [1]:
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 29 Apr 2020 at 11:53, 刘大龙  wrote:
> > >
> > > > Hi, all!
> > > >
> > > > At flink master branch, we have supported state ttl  for sql
> mini-batch
> > > > deduplication using incremental cleanup strategy on heap backend,
> > refer to
> > > > FLINK-16581. Because I want to test the performance of this feature,
> > so I
> > > > compile master branch code and deploy the jar to production
> > > > environment,then run three types of tests, re

Re: [DISCUSS] Send issue and pull request notifications for flink-web and flink-shaded to iss...@flink.apache.org

2020-05-04 Thread Jark Wu
Big +1 to this.

Best,
Jark

On Mon, 4 May 2020 at 23:44, Till Rohrmann  wrote:

> Hi everyone,
>
> due to some changes on the ASF side, we are now seeing issue and pull
> request notifications for the flink-web [1] and flink-shaded [2] repo on
> dev@flink.apache.org. I think this is not ideal since the dev ML is much
> more noisy now.
>
> I would propose to send these notifications to iss...@flink.apache.org as
> we are currently doing it for the Flink main repo [3].
>
> What do you think?
>
> [1] https://github.com/apache/flink-web
> [2] https://github.com/apache/flink-shaded
> [3] https://gitbox.apache.org/schemes.cgi?flink
>
> Cheers,
> Till
>


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-30 Thread Jark Wu
gt; appealing. At the same time cost of being able to have a nice
> yaml/hocon/json representation is just adding a single suffix to a
> single(at most 2 key + value) property. The question is between `format`
>
> =
>
> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
> doing it.
>
> Just to have a full picture. Both cases can be represented in yaml, but
> the difference is significant:
> format: 'json'
> format.option: 'value'
>
> vs
> format:
> kind: 'json'
>
> option: 'value'
>
> Best,
> Dawid
>
> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>
> Personally I don't have any preference here.  Compliance wih standard
>
> YAML
>
> parser is probably more important
>
> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu   
> wrote:
>
>
> From a user's perspective, I prefer the shorter one "format=json",
>
> because
>
> it's more concise and straightforward. The "kind" is redundant for
>
> users.
>
> Is there a real case requires to represent the configuration in JSON
> style?
> As far as I can see, I don't see such requirement, and everything works
> fine by now.
>
> So I'm in favor of "format=json". But if the community insist to follow
> code style on this, I'm also fine with the longer one.
>
> Btw, I also CC user mailing list to listen more user's feedback.
>
> Because I
>
> think this is relative to usability.
>
> Best,
> Jark
>
> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  
> 
> wrote:
>
>
>  > Therefore, should we advocate instead:
>  >
>  > 'format.kind' = 'json',
>  > 'format.fail-on-missing-field' = 'false'
>
> Yes. That's pretty much it.
>
> This is reasonable important to nail down as with such violations I
> believe we could not actually switch to a standard YAML parser.
>
> On 29/04/2020 16:05, Timo Walther wrote:
>
> Hi everyone,
>
> discussions around ConfigOption seem to be very popular recently.
>
> So I
>
> would also like to get some opinions on a different topic.
>
> How do we represent hierarchies in ConfigOption? In FLIP-122, we
> agreed on the following DDL syntax:
>
> CREATE TABLE fs_table (
>  ...
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = 'file:///path/to/whatever',
>  'format' = 'csv',
>  'format.allow-comments' = 'true',
>  'format.ignore-parse-errors' = 'true'
> );
>
> Of course this is slightly different from regular Flink core
> configuration but a connector still needs to be configured based on
> these options.
>
> However, I think this FLIP violates our code style guidelines
>
> because
>
> 'format' = 'json',
> 'format.fail-on-missing-field' = 'false'
>
> is an invalid hierarchy. `format` cannot be a string and a top-level
> object at the same time.
>
> We have similar problems in our runtime configuration:
>
> state.backend=
> state.backend.incremental=
> restart-strategy=
> restart-strategy.fixed-delay.delay=
> high-availability=
> high-availability.cluster-id=
>
> The code style guide states "Think of the configuration as nested
> objects (JSON style)". So such hierarchies cannot be represented in
>
> a
>
> nested JSON style.
>
> Therefore, should we advocate instead:
>
> 'format.kind' = 'json',
> 'format.fail-on-missing-field' = 'false'
>
> What do you think?
>
> Thanks,
> Timo
>
> [1]
>
>
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking 
> UniversityTel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: [DISCUSS] FLIP-84 Feedback Summary

2020-04-30 Thread Jark Wu
;
> > > >>>>>>>>>> Thanks so much for the effort of `multiline statements
> > > >> supporting`,
> > > >>>>>>>>>> I have a few questions about this method:
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1. users can well control the execution logic through the
> > > >> proposed
> > > >>>>>> method
> > > >>>>>>>>>>  if they know what the statements are (a statement is a
> > > DDL, a
> > > >>>> DML
> > > >>>>>> or
> > > >>>>>>>>>> others).
> > > >>>>>>>>>> but if a statement is from a file, that means users do not
> > know
> > > >>>> what
> > > >>>>>> the
> > > >>>>>>>>>> statements are,
> > > >>>>>>>>>> the execution behavior is unclear.
> > > >>>>>>>>>> As a platform user, I think this method is hard to use,
> unless
> > > >> the
> > > >>>>>>>>> platform
> > > >>>>>>>>>> defines
> > > >>>>>>>>>> a set of rule about the statements order, such as: no select
> > in
> > > >> the
> > > >>>>>>>>> middle,
> > > >>>>>>>>>> dml must be at tail of sql file (which may be the most case
> in
> > > >>>> product
> > > >>>>>>>>>> env).
> > > >>>>>>>>>> Otherwise the platform must parse the sql first, then know
> > what
> > > >> the
> > > >>>>>>>>>> statements are.
> > > >>>>>>>>>> If do like that, the platform can handle all cases through
> > > >>>>>> `executeSql`
> > > >>>>>>>>> and
> > > >>>>>>>>>> `StatementSet`.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 2. SQL client can't also use `executeMultilineSql` to
> supports
> > > >>>>>> multiline
> > > >>>>>>>>>> statements,
> > > >>>>>>>>>>  because there are some special commands introduced in
> SQL
> > > >>>> client,
> > > >>>>>>>>>> such as `quit`, `source`, `load jar` (not exist now, but
> maybe
> > > we
> > > >>>> need
> > > >>>>>>>>> this
> > > >>>>>>>>>> command
> > > >>>>>>>>>>  to support dynamic table source and udf).
> > > >>>>>>>>>> Does TableEnvironment also supports those commands?
> > > >>>>>>>>>>
> > > >>>>>>>>>> 3. btw, we must have this feature in release-1.11? I find
> > there
> > > >> are
> > > >>>>>> few
> > > >>>>>>>>>> user cases
> > > >>>>>>>>>>  in the feedback document which behavior is unclear now.
> > > >>>>>>>>>>
> > > >>>>>>>>>> regarding to "change the return value from `Iterable > > >>>>>>>>>> `Iterator > > >>>>>>>>>> I couldn't agree more with this change. Just as Dawid
> > mentioned
> > > >>>>>>>>>> "The contract of the Iterable#iterator is that it returns a
> > new
> > > >>>>>> iterator
> > > >>>>>>>>>> each time,
> > > >>>>>>>>>>  which effectively means we can iterate the results
> > multiple
> > > >>>>>> times.",
> > > >>>>>>>>>> we does not provide iterate the results multiple times.
> > > >>>>>>>>>> If we want do that, the client must buffer all results. but
> > it's
> > > >>>>>>>>> impossible
> > > >>>>>>>>>> for streaming job.
> &

Re: [DISCUSS]Refactor flink-jdbc connector structure

2020-04-30 Thread Jark Wu
Big +1 from my side.
The new structure and class names look nicer now.

Regarding to the compability problem, I have looked into the public APIs in
flink-jdbc, there are 3 kinds of APIs now:
1) new introduced JdbcSink for DataStream users in 1.11
2) JDBCAppendTableSink, JDBCUpsertTableSink, JDBCTableSource are introduced
since 1.9
3) very ancient JDBCOutputFormat and JDBCInputFormat

For (1), as it's an un-released API, so I think it's safe to move to new
package. cc @Khachatryan Roman  who
contributed this.
For (2), because TableSource and TableSink are not designed to be accessed
by users since 1.11, so I think it's fine to move them.
For (3), I'm not sure how many users are still using these out-of-date
classes.
But I think it's fine to keep them for one more version, and drop them in
the next version.


Best,
Jark

On Thu, 30 Apr 2020 at 22:57, Flavio Pompermaier 
wrote:

> Very big +1 from me
>
> Best,
> Flavio
>
> On Thu, Apr 30, 2020 at 4:47 PM David Anderson 
> wrote:
>
> > I'm very happy to see the jdbc connector being normalized in this way. +1
> > from me.
> >
> > David
> >
> > On Thu, Apr 30, 2020 at 2:14 PM Timo Walther  wrote:
> >
> > > Hi Leonard,
> > >
> > > this sounds like a nice refactoring for consistency. +1 from my side.
> > >
> > > However, I'm not sure how much backwards compatibility is required.
> > > Maybe others can comment on this.
> > >
> > > Thanks,
> > > Timo
> > >
> > > On 30.04.20 14:09, Leonard Xu wrote:
> > > > Hi, dear community
> > > >
> > > > Recently, I’m thinking to refactor the flink-jdbc connector structure
> > > before release 1.11.
> > > > After the refactor, in the future,  we can easily introduce unified
> > > pluggable JDBC dialect for Table and DataStream, and we can have a
> better
> > > module organization and implementations.
> > > >
> > > > So, I propose following changes:
> > > > 1) Use `Jdbc` instead of `JDBC` in the new public API and interface
> > > name. The Datastream API `JdbcSink` which imported in this version has
> > > followed this standard.
> > > >
> > > > 2) Move all interface and classes from `org.apache.flink.java.io
> > .jdbc`(old
> > > package) to `org.apache.flink.connector.jdbc`(new package) to follow
> the
> > > base connector path in FLIP-27.
> > > > I think we can move JDBC TableSource, TableSink and factory from old
> > > package to new package because TableEnvironment#registerTableSource、
> > > TableEnvironment#registerTableSink  will be removed in 1.11 ans these
> > > classes are not exposed to users[1].
> > > > We can move Datastream API JdbcSink from old package to new package
> > > because it’s  introduced in this version.
> > > > We will still keep `JDBCInputFormat` and `JDBCOoutoutFormat` in old
> > > package and deprecate them.
> > > > Other classes/interfaces are internal used and we can move to new
> > > package without breaking compatibility.
> > > > 3) Rename `flink-jdbc` to `flink-connector-jdbc`. well, this is a
> > > compatibility broken change but in order to comply with other
> connectors
> > > and it’s real a connector rather than a flink-jdc-driver[2] we’d better
> > > decide do it ASAP.
> > > >
> > > >
> > > > What do you think? Any feedback is appreciate.
> > > >
> > > >
> > > > Best,
> > > > Leonard Xu
> > > >
> > > > [1]
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Remove-registration-of-TableSource-TableSink-in-Table-Env-and-ConnectTableDescriptor-td37270.html
> > > <
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Remove-registration-of-TableSource-TableSink-in-Table-Env-and-ConnectTableDescriptor-td37270.html
> > > >
> > > > [2]https://github.com/ververica/flink-jdbc-driver <
> > > https://github.com/ververica/flink-jdbc-driver>
> > > >
> > > >
> > >
> > >
> >
>


Re: [DISCUSS] Introduce TableFactory for StatefulSequenceSource

2020-04-30 Thread Jark Wu
Hi Konstantin,

Thanks for the link of Java Faker. It's an intereting project and
could benefit to a comprehensive datagen source.

What the discarding and printing sink look like in your thought?
1) manually create a table with a `blackhole` or `print` connector, e.g.

CREATE TABLE my_sink (
  a INT,
  b STRNG,
  c DOUBLE
) WITH (
  'connector' = 'print'
);
INSERT INTO my_sink SELECT a, b, c FROM my_source;

2) a system built-in table named `blackhole` and `print` without manually
schema work, e.g.
INSERT INTO print SELECT a, b, c, d FROM my_source;

Best,
Jark



On Thu, 30 Apr 2020 at 21:19, Konstantin Knauf  wrote:

> Hi everyone,
>
> sorry for reviving this thread at this point in time. Generally, I think,
> this is a very valuable effort. Have we considered only providing a very
> basic data generator (+ discarding and printing sink tables) in Apache
> Flink and moving a more comprehensive data generating table source to an
> ecosystem project promoted on flink-packages.org. I think this has a lot
> of
> potential (e.g. in combination with Java Faker [1]), but it would probably
> be better served in a small separately maintained repository.
>
> Cheers,
>
> Konstantin
>
> [1] https://github.com/DiUS/java-faker
>
>
> On Tue, Mar 24, 2020 at 9:10 AM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > I created https://issues.apache.org/jira/browse/FLINK-16743 for
> follow-up
> > discussion. FYI.
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Mar 24, 2020 at 2:20 PM Bowen Li  wrote:
> >
> > > I agree with Jingsong that sink schema inference and system tables can
> be
> > > considered later. I wouldn’t recommend to tackle them for the sake of
> > > simplifying user experience to the extreme. Providing the above handy
> > > source and sink implementations already offer users a ton of immediate
> > > value.
> > >
> > >
> > > On Mon, Mar 23, 2020 at 20:20 Jingsong Li 
> > wrote:
> > >
> > > > Hi Benchao,
> > > >
> > > > > do you think we need to add more columns with various types?
> > > >
> > > > I didn't list all types, but we should support primitive types,
> > varchar,
> > > > Decimal, Timestamp and etc...
> > > > This can be done continuously.
> > > >
> > > > Hi Benchao, Jark,
> > > > About console and blackhole, yes, they can have no schema, the schema
> > can
> > > > be inferred by upstream node.
> > > > - But now we don't have this mechanism to do these configurable sink
> > > > things.
> > > > - If we want to support, we need a single way to support these two
> > sinks.
> > > > - And uses can use "create table like" and others way to simplify
> DDL.
> > > >
> > > > And for providing system/registered tables (`console` and
> `blackhole`):
> > > > - I have no strong opinion on these system tables. In SQL, will be
> > > "insert
> > > > into blackhole select a /*int*/, b /*string*/ from tableA", "insert
> > into
> > > > blackhole select a /*double*/, b /*Map*/, c /*string*/ from tableB".
> It
> > > > seems that Blackhole is a universal thing, which makes me feel bad
> > > > intuitively.
> > > > - Can user override these tables? If can, we need ensure it can be
> > > > overwrite by catalog tables.
> > > >
> > > > So I think we can leave these system tables to future too.
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Mon, Mar 23, 2020 at 4:44 PM Jark Wu  wrote:
> > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > Regarding (2) and (3), I was thinking to ignore manually DDL work,
> so
> > > > users
> > > > > can use them directly:
> > > > >
> > > > > # this will log results to `.out` files
> > > > > INSERT INTO console
> > > > > SELECT ...
> > > > >
> > > > > # this will drop all received records
> > > > > INSERT INTO blackhole
> > > > > SELECT ...
> > > > >
> > > > > Here `console` and `blackhole` are system sinks which is similar to
> > > > system
> > > > > functions.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Mon, 23 Mar 

Re: chinese-translation for FLINK-16091

2020-04-30 Thread Jark Wu
Hi,

Welcome to the community!
There is no contributor permission now, you can just comment under the JIRA
issue.
And committer will assign issue to you if no one is working on this.

Best,
Jark


On Thu, 30 Apr 2020 at 17:36, flinker  wrote:

> Hi,
>
> I want to contribute to Apache Flink.
> Would you please give me the contributor permission?
> My JIRA ID is FLINK-16091 ;
> https://issues.apache.org/jira/browse/FLINK-16091.Thank you.


Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-30 Thread Jark Wu
Thanks for the tip! I checked it and you are right :)

On Thu, 30 Apr 2020 at 15:08, Chesnay Schepler  wrote:

> flink-sql-connector-elasticsearch6 isn't bundling com.carrotsearch:hppc,
> nor does it have dependencies on org.elasticsearch:elasticsearch-geo,
> org.elasticsearch.plugin:lang-mustache-client nor
> com.github.spullara.mustache.java:compiler (and thus is also not bundling
> them).
>
> You can check this yourself by packaging the connector and comparing the
> shade-plugin output with the NOTICE file.
>
> On 30/04/2020 08:55, Jark Wu wrote:
>
> Hi Chesnay,
>
> I mean `flink-sql-connector-elasticsearch6`.
> Because this dependency change on elasticserch7 [1] is totally following
> how elasticsearch6 does. And they have the almost same dependencies.
>
> Best,
> Jark
>
> [1]:
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
>
> On Thu, 30 Apr 2020 at 14:44, Chesnay Schepler  wrote:
>
>> ES6 isn't bundling these dependencies.
>>
>> On 29/04/2020 17:29, Jark Wu wrote:
>> > Looks like the ES NOTICE problem is a long-standing problem, because the
>> > ES6 sql connector NOTICE also misses these dependencies.
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 29 Apr 2020 at 17:26, Robert Metzger 
>> wrote:
>> >
>> >> Thanks for taking a look Chesnay. Then let me officially cancel the
>> >> release:
>> >>
>> >> -1 (binding)
>> >>
>> >>
>> >> Another question that I had while checking the release was the
>> >> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
>> >> distribution.
>> >> It does not contain a LICENSE and NOTICE file at the root level (which
>> is
>> >> okay [1] for binary releases), but in the "pyflink/" directory. There
>> is
>> >> also a "deps/" directory, which contains a full distribution of Flink,
>> >> without any license files.
>> >> I believe it would be a little bit nicer to have the LICENSE and NOTICE
>> >> file in the root directory (if the python wheels format permits) to
>> make
>> >> sure it is obvious that all binary release contents are covered by
>> these
>> >> files.
>> >>
>> >>
>> >> [1]
>> >>
>> http://www.apache.org/legal/release-policy.html#licensing-documentation
>> >>
>> >>
>> >>
>> >>
>> >> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
>> >> wrote:
>> >>
>> >>> Thanks a lot for creating a release candidate for 1.10.1!
>> >>>
>> >>> +1 from my side
>> >>>
>> >>> checked
>> >>> - md5/gpg, ok
>> >>> - source does not contain any binaries, ok
>> >>> - pom points to the same version 1.10.1, ok
>> >>> - README file does not contain anything unexpected, ok
>> >>> - maven clean package -DskipTests, ok
>> >>> - maven clean verify, encounter a test timeout exception, but I think
>> it
>> >>> does not block the RC(have created an issue[1] to track it),
>> >>> - run demos on a stand-alone cluster, ok
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-17458
>> >>> Best,
>> >>> Congxian
>> >>>
>> >>>
>> >>> Robert Metzger  于2020年4月29日周三 下午2:54写道:
>> >>>
>> >>>> Thanks a lot for creating a release candidate for 1.10.1!
>> >>>>
>> >>>> I'm not sure, but I think found a potential issue in the release
>> while
>> >>>> checking dependency changes on the ElasticSearch7 connector:
>> >>>>
>> >>>>
>> >>
>> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
>> >>>> In this change, "com.carrotsearch:hppc" has been added to the shaded
>> >> jar
>> >>> (
>> >>>>
>> >>
>> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
>> >>>> ),
>> >>>> without including proper mention of that dependency in
>> >> "META-INF/NOTICE".
>> >>>>
>&

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-30 Thread Jark Wu
Hi Chesnay,

I mean `flink-sql-connector-elasticsearch6`.
Because this dependency change on elasticserch7 [1] is totally following
how elasticsearch6 does. And they have the almost same dependencies.

Best,
Jark

[1]:
https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38

On Thu, 30 Apr 2020 at 14:44, Chesnay Schepler  wrote:

> ES6 isn't bundling these dependencies.
>
> On 29/04/2020 17:29, Jark Wu wrote:
> > Looks like the ES NOTICE problem is a long-standing problem, because the
> > ES6 sql connector NOTICE also misses these dependencies.
> >
> > Best,
> > Jark
> >
> > On Wed, 29 Apr 2020 at 17:26, Robert Metzger 
> wrote:
> >
> >> Thanks for taking a look Chesnay. Then let me officially cancel the
> >> release:
> >>
> >> -1 (binding)
> >>
> >>
> >> Another question that I had while checking the release was the
> >> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
> >> distribution.
> >> It does not contain a LICENSE and NOTICE file at the root level (which
> is
> >> okay [1] for binary releases), but in the "pyflink/" directory. There is
> >> also a "deps/" directory, which contains a full distribution of Flink,
> >> without any license files.
> >> I believe it would be a little bit nicer to have the LICENSE and NOTICE
> >> file in the root directory (if the python wheels format permits) to make
> >> sure it is obvious that all binary release contents are covered by these
> >> files.
> >>
> >>
> >> [1]
> >> http://www.apache.org/legal/release-policy.html#licensing-documentation
> >>
> >>
> >>
> >>
> >> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
> >> wrote:
> >>
> >>> Thanks a lot for creating a release candidate for 1.10.1!
> >>>
> >>> +1 from my side
> >>>
> >>> checked
> >>> - md5/gpg, ok
> >>> - source does not contain any binaries, ok
> >>> - pom points to the same version 1.10.1, ok
> >>> - README file does not contain anything unexpected, ok
> >>> - maven clean package -DskipTests, ok
> >>> - maven clean verify, encounter a test timeout exception, but I think
> it
> >>> does not block the RC(have created an issue[1] to track it),
> >>> - run demos on a stand-alone cluster, ok
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-17458
> >>> Best,
> >>> Congxian
> >>>
> >>>
> >>> Robert Metzger  于2020年4月29日周三 下午2:54写道:
> >>>
> >>>> Thanks a lot for creating a release candidate for 1.10.1!
> >>>>
> >>>> I'm not sure, but I think found a potential issue in the release while
> >>>> checking dependency changes on the ElasticSearch7 connector:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> >>>> In this change, "com.carrotsearch:hppc" has been added to the shaded
> >> jar
> >>> (
> >>>>
> >>
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> >>>> ),
> >>>> without including proper mention of that dependency in
> >> "META-INF/NOTICE".
> >>>>
> >>>> My checking notes:
> >>>>
> >>>> - checked the diff for dependency changes:
> >>>>
> >>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> >>>> (w/o
> >>>> <
> >>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> >>>> release commit:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> >>>> )
> >>>>- flink-connector-hive sets the derby version for test-scoped
> >>>> dependencies:
> >>>>
> >>>>
> >>
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> >>>>   - no NOTICE file found, but this module does not forward
> binaries.
> >>>

Re: [VOTE] Release 1.10.1, release candidate #1

2020-04-29 Thread Jark Wu
Looks like the ES NOTICE problem is a long-standing problem, because the
ES6 sql connector NOTICE also misses these dependencies.

Best,
Jark

On Wed, 29 Apr 2020 at 17:26, Robert Metzger  wrote:

> Thanks for taking a look Chesnay. Then let me officially cancel the
> release:
>
> -1 (binding)
>
>
> Another question that I had while checking the release was the
> "apache-flink-1.10.1.tar.gz" binary, which I suppose is the python
> distribution.
> It does not contain a LICENSE and NOTICE file at the root level (which is
> okay [1] for binary releases), but in the "pyflink/" directory. There is
> also a "deps/" directory, which contains a full distribution of Flink,
> without any license files.
> I believe it would be a little bit nicer to have the LICENSE and NOTICE
> file in the root directory (if the python wheels format permits) to make
> sure it is obvious that all binary release contents are covered by these
> files.
>
>
> [1]
> http://www.apache.org/legal/release-policy.html#licensing-documentation
>
>
>
>
> On Wed, Apr 29, 2020 at 11:10 AM Congxian Qiu 
> wrote:
>
> > Thanks a lot for creating a release candidate for 1.10.1!
> >
> > +1 from my side
> >
> > checked
> > - md5/gpg, ok
> > - source does not contain any binaries, ok
> > - pom points to the same version 1.10.1, ok
> > - README file does not contain anything unexpected, ok
> > - maven clean package -DskipTests, ok
> > - maven clean verify, encounter a test timeout exception, but I think it
> > does not block the RC(have created an issue[1] to track it),
> > - run demos on a stand-alone cluster, ok
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-17458
> > Best,
> > Congxian
> >
> >
> > Robert Metzger  于2020年4月29日周三 下午2:54写道:
> >
> > > Thanks a lot for creating a release candidate for 1.10.1!
> > >
> > > I'm not sure, but I think found a potential issue in the release while
> > > checking dependency changes on the ElasticSearch7 connector:
> > >
> > >
> >
> https://github.com/apache/flink/commit/1827e4dddfbac75a533ff2aea2f3e690777a3e5e#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > >
> > > In this change, "com.carrotsearch:hppc" has been added to the shaded
> jar
> > (
> > >
> > >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1362/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.1/flink-sql-connector-elasticsearch7_2.11-1.10.1.jar
> > > ),
> > > without including proper mention of that dependency in
> "META-INF/NOTICE".
> > >
> > >
> > > My checking notes:
> > >
> > > - checked the diff for dependency changes:
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1
> > > (w/o
> > > <
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1(w/o
> > >
> > > release commit:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...0e2b520ec60cc11dce210bc38e574a05fa5a7734
> > > )
> > >   - flink-connector-hive sets the derby version for test-scoped
> > > dependencies:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-f4dbf40e8457457eb01ae22b53baa3ec
> > >  - no NOTICE file found, but this module does not forward binaries.
> > >   - kafka 0.10 minor version upgrade:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0287a3f3c37b454c583b6b56de1392e4
> > >   - NOTICE change found
> > >- ES7 changes shading:
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-bd2211176ab6e7fa83ffeaa89481ff38
> > >  - problem found
> > >   - Influxdb version change
> > >
> > >
> >
> https://github.com/apache/flink/compare/release-1.10.0...release-1.10.1-rc1#diff-0d2cce4875b2804ab89c3343a7de1ca6
> > >  - NOTICE change found
> > >
> > >
> > >
> > > On Fri, Apr 24, 2020 at 8:10 PM Yu Li  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for version
> 1.10.1,
> > as
> > > > follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > >
> > > > The complete staging area is available for your review, which
> includes:
> > > > * JIRA release notes [1],
> > > > * the official Apache source release and binary convenience releases
> to
> > > be
> > > > deployed to dist.apache.org [2], which are signed with the key with
> > > > fingerprint D8D3D42E84C753CA5F170BDF93C07902771AB743 [3],
> > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > * source code tag "release-1.10.1-rc1" [5],
> > > > * website pull request listing the new release and adding
> announcement
> > > blog
> > > > post [6].
> > > >
> > > > The vote will be open for at least 72 hours. It is adopted by
> majority
> > > > approval, with at least 3 PMC affirmative votes.
> > > >
> > > > Thanks,
> > > > Yu
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jark Wu
>From a user's perspective, I prefer the shorter one "format=json", because
it's more concise and straightforward. The "kind" is redundant for users.
Is there a real case requires to represent the configuration in JSON style?
As far as I can see, I don't see such requirement, and everything works
fine by now.

So I'm in favor of "format=json". But if the community insist to follow
code style on this, I'm also fine with the longer one.

Btw, I also CC user mailing list to listen more user's feedback. Because I
think this is relative to usability.

Best,
Jark

On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  wrote:

>  > Therefore, should we advocate instead:
>  >
>  > 'format.kind' = 'json',
>  > 'format.fail-on-missing-field' = 'false'
>
> Yes. That's pretty much it.
>
> This is reasonable important to nail down as with such violations I
> believe we could not actually switch to a standard YAML parser.
>
> On 29/04/2020 16:05, Timo Walther wrote:
> > Hi everyone,
> >
> > discussions around ConfigOption seem to be very popular recently. So I
> > would also like to get some opinions on a different topic.
> >
> > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > agreed on the following DDL syntax:
> >
> > CREATE TABLE fs_table (
> >  ...
> > ) WITH (
> >  'connector' = 'filesystem',
> >  'path' = 'file:///path/to/whatever',
> >  'format' = 'csv',
> >  'format.allow-comments' = 'true',
> >  'format.ignore-parse-errors' = 'true'
> > );
> >
> > Of course this is slightly different from regular Flink core
> > configuration but a connector still needs to be configured based on
> > these options.
> >
> > However, I think this FLIP violates our code style guidelines because
> >
> > 'format' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > is an invalid hierarchy. `format` cannot be a string and a top-level
> > object at the same time.
> >
> > We have similar problems in our runtime configuration:
> >
> > state.backend=
> > state.backend.incremental=
> > restart-strategy=
> > restart-strategy.fixed-delay.delay=
> > high-availability=
> > high-availability.cluster-id=
> >
> > The code style guide states "Think of the configuration as nested
> > objects (JSON style)". So such hierarchies cannot be represented in a
> > nested JSON style.
> >
> > Therefore, should we advocate instead:
> >
> > 'format.kind' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > What do you think?
> >
> > Thanks,
> > Timo
> >
> > [1]
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >
>
>


[jira] [Created] (FLINK-17462) Support CSV serialization and deseriazation schema for RowData type

2020-04-29 Thread Jark Wu (Jira)
Jark Wu created FLINK-17462:
---

 Summary: Support CSV serialization and deseriazation schema for 
RowData type
 Key: FLINK-17462
 URL: https://issues.apache.org/jira/browse/FLINK-17462
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Add support {{CsvRowDataDeserializationSchema}} and 
{{CsvRowDataSerializationSchema}} for the new data structure {{RowData}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17461) Support JSON serialization and deseriazation schema for RowData type

2020-04-29 Thread Jark Wu (Jira)
Jark Wu created FLINK-17461:
---

 Summary: Support JSON serialization and deseriazation schema for 
RowData type
 Key: FLINK-17461
 URL: https://issues.apache.org/jira/browse/FLINK-17461
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Add support {{JsonRowDataDeserializationSchema}} and 
{{JsonRowDataSerializationSchema}} for the new data structure {{RowData}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

2020-04-29 Thread Jark Wu
Hi lsyldliu,

Thanks for investigating this.

First of all, if you are using mini-batch deduplication, it doesn't support
state ttl in 1.9. That's why the tps looks the same with 1.11 disable state
ttl.
We just introduce state ttl for mini-batch deduplication recently.

Regarding to the performance regression, it looks very surprise to me. The
performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
I don't have much experience of the underlying of StateTtlConfig. So I loop
in @Yu Li  @YunTang in CC who may have more insights on
this.

For more information, we use the following StateTtlConfig [1] in blink
planner:

StateTtlConfig
  .newBuilder(Time.milliseconds(retentionTime))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build();


Best,
Jark


[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27





On Wed, 29 Apr 2020 at 11:53, 刘大龙  wrote:

> Hi, all!
>
> At flink master branch, we have supported state ttl  for sql mini-batch
> deduplication using incremental cleanup strategy on heap backend, refer to
> FLINK-16581. Because I want to test the performance of this feature, so I
> compile master branch code and deploy the jar to production
> environment,then run three types of tests, respectively:
>
>
>
>
> flink 1.9.0 release version enable state ttl
> flink 1.11-snapshot version disable state ttl
> flink 1.11-snapshot version enable state ttl
>
>
>
>
> The test query sql as follows:
>
> select order_date,
> sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> goods_carriage_amt) as saleP,
> sum(amount) as saleN,
> count(distinct parent_sn) as orderN,
> count(distinct user_id) as cusN
>from(
> select order_date, user_id,
> order_type, order_status, terminal, last_update_time,
> goods_all_fav_amt,
> goods_carriage_amt, virtual_money_amt, price, amount,
> order_quality, quality_goods_cnt, acture_goods_amt
> from (select *, row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum from dm_trd_order_goods)
> where rownum=1
> and (order_type in (1,2,3,4,5) or order_status = 70)
> and terminal = 'shop' and price > 0)
> group by order_date
>
>
> At runtime, this query will generate two operators which include
> Deduplication and GroupAgg. In the test, the configuration is same,
> parallelism is 20, set kafka consumer from the earliest, and disable
> mini-batch function, The test results as follows:
>
> flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w
> records, average tps at 5200/s, Flink UI picture link back pressure,
> checkpoint
> flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink
> receive 883w records, average tps at 5200/s, Flink UI picture link back
> pressure, checkpoint
> flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> flink only receive 168w records because of deduplication operator serious
> back pressure, average tps at 270/s, moreover, checkpoint always fail
> because of deduplication operator serious back pressure, Flink UI picture
> link back pressure, checkpoint
>
> Deduplication state clean up implement in flink 1.9.0 use timer, but
> 1.11-snapshot version use StateTtlConfig, this is the main difference.
> Comparing the three tests comprehensively, we can see that if disable state
> ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state
> ttl. However, if enable state ttl in 1.11-snapshot, performance down is
> nearly 20 times, so I think incremental cleanup strategy cause this
> problem, what do you think about it? @azagrebin, @jark.
>
> Thanks.
>
> lsyldliu
>
> Zhejiang University, College of Control Science and engineer, CSC


[jira] [Created] (FLINK-17437) Use StringData instead of BinaryStringData in code generation

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17437:
---

 Summary: Use StringData instead of BinaryStringData in code 
generation
 Key: FLINK-17437
 URL: https://issues.apache.org/jira/browse/FLINK-17437
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jark Wu


In FLINK-16996, we force to use {{BinaryStringData}} instead of {{StringData}} 
in code generation.  We hard cast StringData to BinaryStringData when 
{{RowData#getString}}. The motivation behind this is that this makes the code 
generator easily to generate opeartions based on string. There are too many 
invokings on the {{BinaryStringData}} now, if we use {{StringData}} in code 
generation, we have to refactor a lot of codes. 

However, this may be worth to do. Because using the public interface 
{{StringData}} is more straightforword.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17430) Support SupportsPartitioning in planner

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17430:
---

 Summary: Support SupportsPartitioning in planner
 Key: FLINK-17430
 URL: https://issues.apache.org/jira/browse/FLINK-17430
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
 Environment: 

Reporter: Jark Wu


Support the {{SupportsPartitioning}} interface for {{DynamicTableSink}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17429) Support SupportsOverwrite in planner

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17429:
---

 Summary: Support SupportsOverwrite in planner
 Key: FLINK-17429
 URL: https://issues.apache.org/jira/browse/FLINK-17429
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
 Environment: Support the {{SupportsProjectionPushDown}} interface for 
{{ScanTableSource}}.


Reporter: Jark Wu


Support the {{SupportsOverwrite}} interface for {{DynamicTableSink}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17428) Support SupportsProjectionPushDown in planner

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17428:
---

 Summary: Support SupportsProjectionPushDown in planner
 Key: FLINK-17428
 URL: https://issues.apache.org/jira/browse/FLINK-17428
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
 Environment: Support the {{SupportsProjectionPushDown}} interface for 
{{ScanTableSource}}.


Reporter: Jark Wu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17427) Support SupportsPartitionPushDown in planner

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17427:
---

 Summary: Support SupportsPartitionPushDown in planner
 Key: FLINK-17427
 URL: https://issues.apache.org/jira/browse/FLINK-17427
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu


Support the {{SupportsPartitionPushDown}} interface for {{ScanTableSource}}.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17426) Support SupportsLimitPushDown in planner

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17426:
---

 Summary: Support SupportsLimitPushDown in planner
 Key: FLINK-17426
 URL: https://issues.apache.org/jira/browse/FLINK-17426
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu


Support the {{SupportsLimitPushDown}} interface for {{ScanTableSource}}.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17425) Supports SupportsFilterPushDown in planner

2020-04-28 Thread Jark Wu (Jira)
Jark Wu created FLINK-17425:
---

 Summary: Supports SupportsFilterPushDown in planner
 Key: FLINK-17425
 URL: https://issues.apache.org/jira/browse/FLINK-17425
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu


Support the {{SupportsFilterPushDown}} interface for {{ScanTableSource}}.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Question about FLIP-66

2020-04-27 Thread Jark Wu
Hi Jungtaek,

Yes. Your understanding is correct :)

Best,
Jark

On Tue, 28 Apr 2020 at 11:58, Jungtaek Lim 
wrote:

> Thanks Kurt and Jark for the detailed explanation! Pretty much helped to
> understand about FLIP-66.
>
> That sounds as Flink won't leverage timestamp in StreamRecord (which is
> hidden and cannot modified easily) and handles the time semantic by the
> input schema for the operation, to unify the semantic between batch and
> stream. Did I understand it correctly?
>
> I'm not familiar with internal of Flink so not easy to consume the
> information in FLINK-11286, but in general I'd be supportive with defining
> watermark as close as possible from source, as it'll be easier to reason
> about. (I basically refer to timestamp assigner instead of watermark
> assigner though.)
>
> - Jungtaek Lim
>
> On Tue, Apr 28, 2020 at 11:37 AM Jark Wu  wrote:
>
> > Hi Jungtaek,
> >
> > Kurt has said what I want to say. I will add some background.
> > Flink Table API & SQL only supports to define processing-time attribute
> and
> > event-time attribute (watermark) on source, not support to define a new
> one
> > in query.
> > The time attributes will pass through the query and time-based operations
> > can only apply on the time attributes.
> >
> > The reason why Flink Table & SQL only supports to define watermark on
> > source is that this can allow us to do per-partition watermark, source
> idle
> > and simplify things.
> > There are also some discussion about "disable arbitrary watermark
> assigners
> > in the middle of a pipeline in DataStream" in this JIRA issue comments.
> >
> > Best,
> > Jark
> >
> > [1]: https://issues.apache.org/jira/browse/FLINK-11286
> >
> >
> > On Tue, 28 Apr 2020 at 09:28, Kurt Young  wrote:
> >
> > > The current behavior is later. Flink gets time attribute column from
> > source
> > > table, and tries to analyze and keep
> > > the time attribute column as much as possible, e.g. simple projection
> or
> > > filter which doesn't effect the column
> > > will keep the time attribute, window aggregate will generate its own
> time
> > > attribute if you select window_start or
> > > window_end. But you're right, sometimes framework will loose the
> > > information about time attribute column, and
> > > after that, some operations will throw exception.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Tue, Apr 28, 2020 at 7:45 AM Jungtaek Lim <
> > kabhwan.opensou...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'm interesting about the new change on FLIP-66 [1], because if I
> > > > understand correctly, Flink hasn't been having event-time timestamp
> > field
> > > > (column) as a part of "normal" schema, and FLIP-66 tries to change
> it.
> > > >
> > > > That sounds as the column may be open for modification, like rename
> > > (alias)
> > > > or some other operations, or even be dropped via projection. Will
> such
> > > > operations affect event-time timestamp for the record? If you have an
> > > idea
> > > > about how Spark Structured Streaming works with watermark then you
> > might
> > > > catch the point.
> > > >
> > > > Maybe the question could be reworded as, does the definition of event
> > > time
> > > > timestamp column on DDL only project to the source definition, or it
> > will
> > > > carry over the entire query and let operator determine such column as
> > > > event-time timestamp. (SSS works as latter.) I think this is a huge
> > > > difference, as for me it's like stability vs flexibility, and
> there're
> > > > drawbacks on latter (there're also drawbacks on former as well, but
> > > > computed column may cover up).
> > > >
> > > > Thanks in advance!
> > > > Jungtaek Lim (HeartSaVioR)
> > > >
> > > > 1.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
> > > >
> > >
> >
>


Re: Question about FLIP-66

2020-04-27 Thread Jark Wu
Hi Jungtaek,

Kurt has said what I want to say. I will add some background.
Flink Table API & SQL only supports to define processing-time attribute and
event-time attribute (watermark) on source, not support to define a new one
in query.
The time attributes will pass through the query and time-based operations
can only apply on the time attributes.

The reason why Flink Table & SQL only supports to define watermark on
source is that this can allow us to do per-partition watermark, source idle
and simplify things.
There are also some discussion about "disable arbitrary watermark assigners
in the middle of a pipeline in DataStream" in this JIRA issue comments.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-11286


On Tue, 28 Apr 2020 at 09:28, Kurt Young  wrote:

> The current behavior is later. Flink gets time attribute column from source
> table, and tries to analyze and keep
> the time attribute column as much as possible, e.g. simple projection or
> filter which doesn't effect the column
> will keep the time attribute, window aggregate will generate its own time
> attribute if you select window_start or
> window_end. But you're right, sometimes framework will loose the
> information about time attribute column, and
> after that, some operations will throw exception.
>
> Best,
> Kurt
>
>
> On Tue, Apr 28, 2020 at 7:45 AM Jungtaek Lim  >
> wrote:
>
> > Hi devs,
> >
> > I'm interesting about the new change on FLIP-66 [1], because if I
> > understand correctly, Flink hasn't been having event-time timestamp field
> > (column) as a part of "normal" schema, and FLIP-66 tries to change it.
> >
> > That sounds as the column may be open for modification, like rename
> (alias)
> > or some other operations, or even be dropped via projection. Will such
> > operations affect event-time timestamp for the record? If you have an
> idea
> > about how Spark Structured Streaming works with watermark then you might
> > catch the point.
> >
> > Maybe the question could be reworded as, does the definition of event
> time
> > timestamp column on DDL only project to the source definition, or it will
> > carry over the entire query and let operator determine such column as
> > event-time timestamp. (SSS works as latter.) I think this is a huge
> > difference, as for me it's like stability vs flexibility, and there're
> > drawbacks on latter (there're also drawbacks on former as well, but
> > computed column may cover up).
> >
> > Thanks in advance!
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1.
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
> >
>


Re: [DISCUSS] Should max/min be part of the hierarchy of config option?

2020-04-27 Thread Jark Wu
+1 for xyz.[min|max]

This is already mentioned in the Code Style Guideline [1].

Best,
Jark


[1]:
https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes

On Mon, 27 Apr 2020 at 21:33, Flavio Pompermaier 
wrote:

> +1 for Chesnay approach
>
> On Mon, Apr 27, 2020 at 2:31 PM Chesnay Schepler 
> wrote:
>
> > +1 for xyz.[min|max]; imo it becomes obvious if think of it like a yaml
> > file:
> >
> > xyz:
> >  min:
> >  max:
> >
> > opposed to
> >
> > min-xyz:
> > max-xyz:
> >
> > IIRC this would also be more in-line with the hierarchical scheme for
> > config options we decided on months ago.
> >
> > On 27/04/2020 13:25, Xintong Song wrote:
> > > +1 for Robert's idea about adding tests/tools checking the pattern of
> new
> > > configuration options, and migrate the old ones in release 2.0.
> > >
> > > Concerning the preferred pattern, I personally agree with Till's
> > opinion. I
> > > think 'xyz.[min|max]' somehow expresses that 'min' and 'max' are
> > properties
> > > of 'xyz', while 'xyz' may also have other properties. An example could
> be
> > > 'taskmanager.memory.network.[min|max|fraction]'.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Apr 27, 2020 at 6:00 PM Till Rohrmann 
> > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> as Robert said I think the problem is that we don't have strict
> > guidelines
> > >> and every committer follows his/her personal taste. I'm actually not
> > sure
> > >> whether we can define bullet-proof guidelines but we can definitely
> > >> make them more concrete.
> > >>
> > >> In this case here, I have to admit that I have an opposing
> > opinion/taste. I
> > >> think it would be better to use xyz.min and xyz.max. The reason is
> that
> > we
> > >> configure a property of xyz which consists of the minimum and maximum
> > >> value. Differently said, min and max belong semantically together and
> > hence
> > >> should be defined together. You can think of it as if the type of the
> > xyz
> > >> config option would be a tuple of two integers instead of two
> individual
> > >> integers.
> > >>
> > >> A comment concerning the existing styles of config options: I think
> > many of
> > >> the config options which follow the max-xzy pattern are actually older
> > >> configuration options.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Mon, Apr 27, 2020 at 10:34 AM Robert Metzger 
> > >> wrote:
> > >>
> > >>> Thanks for starting this discussion.
> > >>> I believe the different options are a lot about personal taste, there
> > are
> > >>> no objective arguments why one option is better than the other.
> > >>>
> > >>> I agree with your proposal to simply go with the "max-xyz" pattern,
> as
> > >> this
> > >>> is the style of the majority of the current configuration options in
> > >> Flink
> > >>> (maybe this also means it is the taste of the majority of Flink
> > >>> developers?).
> > >>>
> > >>> I would propose to add a test or some tooling that checks that all
> new
> > >>> configuration parameters follow this pattern, as well as tickets for
> > >> Flink
> > >>> 2.0 to migrate the "wrong" configuration options.
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Apr 22, 2020 at 5:47 AM Yangze Guo 
> wrote:
> > >>>
> >  Hi, everyone,
> > 
> >  I'm working on FLINK-16605 Add max limitation to the total number of
> >  slots[1]. In the PR, I, Gary and Xintong has a discussion[2] about
> the
> >  config option of this limit.
> >  The central question is whether the "max" should be part of the
> >  hierarchy or part of the property itself.
> > 
> >  It means there could be two patterns:
> >  - max-xyz
> >  - xyz.max
> > 
> >  Currently, there is no clear consensus on which style is better and
> we
> >  could find both patterns in the current Flink. Here, I'd like to
> first
> >  sort out[3]:
> > 
> >  Config options follow the "max-xyz" pattern:
> >  - restart-strategy.failure-rate.max-failures-per-interval
> >  - yarn.maximum-failed-containers
> >  - state.backend.rocksdb.compaction.level.max-size-level-base
> >  - cluster.registration.max-timeout
> >  - high-availability.zookeeper.client.max-retry-attempts
> >  - rest.client.max-content-length
> >  - rest.retry.max-attempts
> >  - rest.server.max-content-length
> >  - jobstore.max-capacity
> >  - taskmanager.registration.max-backoff
> >  - compiler.delimited-informat.max-line-samples
> >  - compiler.delimited-informat.min-line-samples
> >  - compiler.delimited-informat.max-sample-len
> >  - taskmanager.runtime.max-fan
> >  - pipeline.max-parallelism
> >  - execution.checkpointing.max-concurrent-checkpoint
> >  - execution.checkpointing.min-pause
> > 
> >  Config options follow the "xyz.max" pattern:
> >  - taskmanager.memory.jvm-overhead.max
> >  - taskmanager.memory.jvm-overhead.min
> >  - 

Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-26 Thread Jark Wu
Thanks Dian for being the release manager and thanks all who make this
possible.

Best,
Jark

On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:

> Thanks Dian for the release and being the release manager !
>
> Best,
> Leonard Xu
>
>
> 在 2020年4月26日,17:58,Benchao Li  写道:
>
> Thanks Dian for the effort, and all who make this release possible. Great
> work!
>
> Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>
>> Thanks for managing this release!
>>
>> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun 
>> wrote:
>>
>>> Thanks for your great job, Dian!
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>>>
 @Dian, thanks a lot for the release and for being the release manager.
 Also thanks to everyone who made this release possible!

 Best,
 Hequn

 On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:

> Hi everyone,
>
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.9.3, which is the third bugfix release for the Apache Flink
> 1.9 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data 
> streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> Also great thanks to @Jincheng for helping finalize this release.
>
> Regards,
> Dian
>

>>
>> --
>> Konstantin Knauf | Head of Product
>> +49 160 91394525
>>
>> Follow us @VervericaData Ververica 
>>
>> --
>> Join Flink Forward  - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>


[jira] [Created] (FLINK-17385) Fix precision problem when converting JDBC numberic into Flink decimal type

2020-04-25 Thread Jark Wu (Jira)
Jark Wu created FLINK-17385:
---

 Summary: Fix precision problem when converting JDBC numberic into 
Flink decimal type 
 Key: FLINK-17385
 URL: https://issues.apache.org/jira/browse/FLINK-17385
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC, Table SQL / Ecosystem
Reporter: Jark Wu


This is reported in the mailing list: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JDBC-error-on-numeric-conversion-because-of-DecimalType-MIN-PRECISION-td34668.html





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Exact feature freeze date

2020-04-23 Thread Jark Wu
+1

Thanks,
Jark

On Thu, 23 Apr 2020 at 22:36, Xintong Song  wrote:

> +1
> From our side we can also benefit from the extending of feature freeze, for
> pluggable slot allocation, GPU support and perjob mode on Kubernetes
> deployment.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Apr 23, 2020 at 10:31 PM Timo Walther  wrote:
>
> >  From the SQL side, I'm sure that FLIP-95 and FLIP-105 could benefit
> > from extending the feature freeze.
> >
> > Thanks,
> > Timo
> >
> > On 23.04.20 16:11, Aljoscha Krettek wrote:
> > > +1
> > >
> > > Aljoscha
> > >
> > > On 23.04.20 15:23, Till Rohrmann wrote:
> > >> +1 for extending the feature freeze until May 15th.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Apr 23, 2020 at 1:00 PM Piotr Nowojski 
> > >> wrote:
> > >>
> > >>> Hi Stephan,
> > >>>
> > >>> As release manager I’ve seen that quite a bit of features could use
> > >>> of the
> > >>> extra couple of weeks. This also includes some features that I’m
> > >>> involved
> > >>> with, like FLIP-76, or limiting the in-flight buffers.
> > >>>
> > >>> +1 From my side for extending the feature freeze until May 15th.
> > >>>
> > >>> Piotrek
> > >>>
> >  On 23 Apr 2020, at 10:10, Stephan Ewen  wrote:
> > 
> >  Hi all!
> > 
> >  I want to bring up a discussion about when we want to do the feature
> > >>> freeze
> >  for 1.11.
> > 
> >  When kicking off the release cycle, we tentatively set the date to
> >  end of
> >  April, which would be in one week.
> > 
> >  I can say from the features I am involved with (FLIP-27, FLIP-115,
> >  reviewing some state backend improvements, etc.) that it would be
> >  helpful
> >  to have two additional weeks.
> > 
> >  When looking at various other feature threads, my feeling is that
> > there
> > >>> are
> >  more contributors and committers that could use a few more days.
> >  The last two months were quite exceptional in and we did lose a bit
> of
> >  development speed here and there.
> > 
> >  How do you think about making *May 15th* the feature freeze?
> > 
> >  Best,
> >  Stephan
> > >>>
> > >>>
> > >>
> >
> >
>


[jira] [Created] (FLINK-17337) Send UPDATE messages instead of INSERT and DELETE in streaming join operator

2020-04-23 Thread Jark Wu (Jira)
Jark Wu created FLINK-17337:
---

 Summary: Send UPDATE messages instead of INSERT and DELETE in 
streaming join operator
 Key: FLINK-17337
 URL: https://issues.apache.org/jira/browse/FLINK-17337
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Jark Wu


Currently, streaming join operator always send INSERT and DELETE messages for 
simplification if it's not inner join. However, we can send UPDATE_BEFORE and 
UPDATE_AFTER messages instead of INSERT and DELETE. For example, when we 
recieve right record "b", then we can send {{UB[a, null]}} and {{UA[a,b]}} 
instead of {{D[a,null]}}, {{I[a,b]}}. This is an optimization, because UB can 
be omitted in some cases to reduce IO cost and computation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-04-18 Thread Jark Wu
Hi Fabian,

Just to clarify a little bit, we decided to move the "converting
append-only table into changelog table" into future work.
So FLIP-105 only introduced some CDC formats (debezium) and new TableSource
interfaces proposed in FLIP-95.
I should have started a new FLIP for the new CDC formats and keep FLIP-105
as it is to avoid the confusion, sorry about that.

Best,
Jark


On Sat, 18 Apr 2020 at 00:35, Fabian Hueske  wrote:

> Thanks Jark!
>
> I certainly need to read up on FLIP-105 (and I'll try to adjust my
> terminology to changelog table from now on ;-) )
> If FLIP-105 addresses the issue of converting an append-only table into a
> changelog table that upserts on primary key (basically what the VIEW
> definition in my first email did),
> TEMPORAL VIEWs become much less important.
> In that case, we would be well served with TEMPORAL TABLE and TEMPORAL VIEW
> would be a nice-to-have feature for some later time.
>
> Cheers, Fabian
>
>
>
>
>
>
> Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu :
>
> > Hi Fabian,
> >
> > I think converting an append-only table into temporal table contains two
> > things:
> > (1) converting append-only table into changelog table (or retraction
> table
> > as you said)
> > (2) define the converted changelog table (maybe is a view now) as
> temporal
> > (or history tracked).
> >
> > The first thing is also mentioned and discussed in FLIP-105 design draft
> > [1] which proposed a syntax
> > to convert the append-only table into a changelog table.
> >
> > I think TEMPORAL TABLE is quite straightforward and simple, and can
> satisfy
> > most existing changelog
> > data with popular CDC formats. TEMPORAL VIEW is flexible but will involve
> > more SQL codes. I think
> > we can support them both.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
> >
> > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske  wrote:
> >
> > > Hi,
> > >
> > > I agree with most of what Timo said.
> > >
> > > The TEMPORAL keyword (which unfortunately might be easily confused with
> > > TEMPORARY...) looks very intuitive and I think using the only time
> > > attribute for versioning would be a good choice.
> > >
> > > However, TEMPORAL TABLE on retraction tables do not solve the full
> > problem.
> > > I believe there will be also cases where we need to derive a temporal
> > table
> > > from an append only table (what TemporalTableFunctions do right now).
> > > I think the best choice for this would be TEMPORAL VIEW but as I
> > explained,
> > > it might be a longer way until this can be supported.
> > > TEMPORAL VIEW would also address the problem of preprocessing.
> > >
> > > > Regarding retraction table with a primary key and a time-attribute:
> > > > These semantics are still unclear to me. Can retractions only occur
> > > > within watermarks? Or are they also used for representing late
> updates?
> > >
> > > Time attributes and retraction streams are a challenging topic that I
> > > haven't completely understood yet.
> > > So far we treated time attributes always as part of the data.
> > > In combination with retractions, it seems that they become metadata
> that
> > > specifies when a change was done.
> > > I think this is different from treating time attributes as regular
> data.
> > >
> > > Cheers, Fabian
> > >
> > >
> > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman <
> > > sjwies...@gmail.com
> > > >:
> > >
> > > > I really like the TEMPORAL keyword, I find it very intuitive.
> > > >
> > > > The down side of this approach would be that an additional
> > preprocessing
> > > > > step would not be possible anymore because there is no preceding
> > view.
> > > > >
> > > >
> > > >  Yes and no. My understanding is we are not talking about making any
> > > > changes to how temporal tables are defined in the table api. Since
> you
> > > > cannot currently define temporal table functions in pure SQL
> > > applications,
> > > > but only pre-register them in YAML, you can't do any pre-processing
> as
> > it
> > > > stands today. Preprocessing may be a generally useful feature, I'm
> not
> > > > sure, but this s

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-04-17 Thread Jark Wu
> parameter
> > > > to a VIEW not so much.
> > > > * Users would need to specify the VIEW exactly correct, such that it
> > can
> > > be
> > > > used as a temporal table. Look at 1.1 why this is not trivial.
> > > >
> > > > There is two ways to use a TableFunction:
> > > >
> > > > ### 1.3.1 Built-in and pre-registered function that is parameterized
> in
> > > the
> > > > SQL query
> > > >
> > > > Here, we do not need to do anything to register the function. We
> simply
> > > use
> > > > it in the query (see example in 2.2 below)
> > > >
> > > > ### 1.3.2 Parameterize function when it is registered in the catalog
> > > (with
> > > > a provided Java implementation)
> > > >
> > > > This is the approach, we've used so far. In the Table API, the
> function
> > > is
> > > > first parameterized and created and then registered:
> > > > We would need a DDL syntax to parameterize UDFs on registration.
> > > > I don't want to propose a syntax here, but just to get an idea it
> might
> > > > look like this:
> > > >
> > > > CREATE FUNCTION rates AS
> > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH ('table' =
> > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
> > > >
> > > > Right now, the Flink Catalog interface does not have the
> functionality
> > to
> > > > store such parameters and would need some hacks to properly create
> > > properly
> > > > parameterize function instances.
> > > >
> > > >
> > > >
> > > > # 2 Defining a join of an append-only table and a temporal table
> > > >
> > > > The append-only table needs to have a time-attribute (processing time
> > or
> > > > event time, but same as the temporal table).
> > > > The join then needs to specify two things:
> > > > * an equality predicate that includes the primary key of the temporal
> > > table
> > > > * declare the time attribute of the append-only table as the time as
> of
> > > > which to look up the temporal table, i.e, get the version of the
> > temporal
> > > > table that is valid for the timestamp of the current row from the
> > > > append-only table
> > > >
> > > > The tricky part (from a syntax point of view) is to specify the
> lookup
> > > > time.
> > > >
> > > > ## 2.1 the temporal table is a regular table or view (see approaches
> > 1.1
> > > > and 1.2 above)
> > > >
> > > > In this case we can use the "FOR SYSTEM_TIME AS OF x" clause as
> > follows:
> > > >
> > > > SELECT *
> > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> > > > WHERE o.currency = r.currency
> > > >
> > > > IMO, this is a great syntax and the one we should strive for.
> > > > We would need to bend the rules of the SQL standard which only
> allows x
> > > in
> > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on which it
> is
> > > > applied usually needs to be a specific type (not sure if views are
> > > > supported), but I guess this is fine.
> > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for
> > LookupTable
> > > > Joins if x is a processing time attribute [2].
> > > >
> > > > ## 2.2 the temporal table is a TableFunction and parameterized in the
> > > query
> > > > (see 1.3.1 above)
> > > >
> > > > SELECT *
> > > > FROM orders o,
> > > >TEMPORAL_TABLE(
> > > >  table => TABLE(rates_history),
> > > >  key => DESCRIPTOR(currency),
> > > >  time => DESCRIPTOR(rowtime)) r
> > > >ON o.currency = r.currency
> > > >
> > > > The function "TEMPORAL_TABLE" is built-in and nothing was registered
> in
> > > the
> > > > catalog (except the rates_history table).
> > > > In fact this is valid SQL:2016 syntax and called Polymorphic Table
> > > > Functions. Have a look here [3].
> > > >
> > > > ## 2.3 the temporal table is a TableFunction that was parameterized
> > > during
> > > > registration (see 1.3.2 above)
>

Re: [ANNOUNCE] New Apache Flink PMC Member - Hequn Chen

2020-04-17 Thread Jark Wu
Congratulations Hequn!

Best,
Jark

On Fri, 17 Apr 2020 at 15:32, Yangze Guo  wrote:

> Congratulations!
>
> Best,
> Yangze Guo
>
> On Fri, Apr 17, 2020 at 3:19 PM Jeff Zhang  wrote:
> >
> > Congratulations, Hequn!
> >
> > Paul Lam  于2020年4月17日周五 下午3:02写道:
> >
> > > Congrats Hequn! Thanks a lot for your contribution to the community!
> > >
> > > Best,
> > > Paul Lam
> > >
> > > Dian Fu  于2020年4月17日周五 下午2:58写道:
> > >
> > > > Congratulations, Hequn!
> > > >
> > > > > 在 2020年4月17日,下午2:36,Becket Qin  写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I am glad to announce that Hequn Chen has joined the Flink PMC.
> > > > >
> > > > > Hequn has contributed to Flink for years. He has worked on several
> > > > > components including Table / SQL,PyFlink and Flink ML Pipeline.
> > > Besides,
> > > > > Hequn is also very active in the community since the beginning.
> > > > >
> > > > > Congratulations, Hequn! Looking forward to your future
> contributions.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > > (On behalf of the Apache Flink PMC)
> > > >
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>


Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

2020-04-16 Thread Jark Wu
Hi Konstantin,

Thanks for bringing this discussion. I think temporal join is a very
important feature and should be exposed to pure SQL users.
And I already received many requirements like this.
However, my concern is that how to properly support this feature in SQL.
Introducing a DDL syntax for Temporal Table Function is one way, but maybe
not the best one.

The most important reason is that the underlying of temporal table function
is exactly a changelog stream.
The temporal join is actually temporal joining a fact stream with the
changelog stream on processing time or event time.
We will soon support to create a changelog source using DDL once FLIP-95
and FLIP-105 is finished.
At that time, we can have a simple DDL to create changelog source like this;

CREATE TABLE rate_changelog (
  currency STRING,
  rate DECIMAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'rate_binlog',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'debezium-json'
);

In the meanwhile, we already have a SQL standard temporal join syntax [1],
i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
It is currently used as dimension table lookup join, but the semantic is
the same to the "temporal table function join"[2].
I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more nature
becuase the definition of B is a *table* not a *table function*,
and the syntax is included in SQL standard.

So once we have the ability to define "rate_changelog" table, then we can
use the following query to temporal join the changelog on processing time.

SELECT *
FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF orders.proctime
ON orders.currency = rate_changelog.currency;

In a nutshell, once FLIP-95 and FLIP-105 is ready, we can easily to support
"temporal join on changelogs" without introducing new syntax.
IMO, introducing a DDL syntax for Temporal Table Function looks like not an
easy way and may have repetitive work.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function





On Thu, 16 Apr 2020 at 23:04, Benchao Li  wrote:

> Hi Konstantin,
>
> Thanks for bringing up this discussion. +1 for the idea.
> We have met this in our company too, and I planned to support it recently
> in our internal branch.
>
> regarding to your questions,
> 1) I think it might be more a table/view than function, just like Temporal
> Table (which is also known as
> dimension table). Maybe we need a DDL like CREATE VIEW and plus some
> additional settings.
> 2) If we design the DDL for it like view, then maybe temporary is ok
> enough.
>
> Konstantin Knauf  于2020年4月16日周四 下午8:16写道:
>
> > Hi everyone,
> >
> > it would be very useful if temporal tables could be created  via DDL.
> > Currently, users either need to do this in the Table API or in the
> > environment file of the Flink CLI, which both require the user to switch
> > the context of the SQL CLI/Editor. I recently created a ticket for this
> > request [1].
> >
> > I see two main questions:
> >
> > 1) What would be the DDL syntax? A Temporal Table is on the one hand a
> view
> > and on the other a function depending on how you look at it.
> >
> > 2) Would this temporal table view/function be stored in the catalog or
> only
> > be temporary?
> >
> > I personally do not have much experience in this area of Flink, so I am
> > looking forward to hearing your thoughts on this.
> >
> > Best,
> >
> > Konstantin
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16824
> >
> > --
> >
> > Konstantin Knauf
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-04-16 Thread Jark Wu
dist is too
> > > "thin" for first-time SQL users and it is too "fat" for production
> > > users, that is where serving no-one properly with the current
> > > middle-ground. That's why I think introducing those specialized
> > > "spins" of Flink dist would be good.
> > >
> > > By the way, at some point in the future production users might not
> > > even need to get a Flink dist anymore. They should be able to have
> > > Flink as a dependency of their project (including the runtime) and
> > > then build an image from this for Kubernetes or a fat jar for YARN.
> > >
> > > Aljoscha
> > >
> > > On 15.04.20 18:14, wenlong.lwl wrote:
> > >
> > > Hi all,
> > >
> > > Regarding slim and fat distributions, I think different kinds of jobs
> > > may
> > > prefer different type of distribution:
> > >
> > > For DataStream job, I think we may not like fat distribution
> > >
> > > containing
> > >
> > > connectors because user would always need to depend on the connector
> > >
> > > in
> > >
> > > user code, it is easy to include the connector jar in the user lib.
> > >
> > > Less
> > >
> > > jar in lib means less class conflicts and problems.
> > >
> > > For SQL job, I think we are trying to encourage user to user pure
> > > sql(DDL +
> > > DML) to construct their job, In order to improve user experience, It
> > > may be
> > > important for flink, not only providing as many connector jar in
> > > distribution as possible especially the connector and format we have
> > > well
> > > documented,  but also providing an mechanism to load connectors
> > > according
> > > to the DDLs,
> > >
> > > So I think it could be good to place connector/format jars in some
> > > dir like
> > > opt/connector which would not affect jobs by default, and introduce a
> > > mechanism of dynamic discovery for SQL.
> > >
> > > Best,
> > > Wenlong
> > >
> > > On Wed, 15 Apr 2020 at 22:46, Jingsong Li  <
> > jingsongl...@gmail.com>
> > > wrote:
> > >
> > >
> > > Hi,
> > >
> > > I am thinking both "improve first experience" and "improve production
> > > experience".
> > >
> > > I'm thinking about what's the common mode of Flink?
> > > Streaming job use Kafka? Batch job use Hive?
> > >
> > > Hive 1.2.1 dependencies can be compatible with most of Hive server
> > > versions. So Spark and Presto have built-in Hive 1.2.1 dependency.
> > > Flink is currently mainly used for streaming, so let's not talk
> > > about hive.
> > >
> > > For streaming jobs, first of all, the jobs in my mind is (related to
> > > connectors):
> > > - ETL jobs: Kafka -> Kafka
> > > - Join jobs: Kafka -> DimJDBC -> Kafka
> > > - Aggregation jobs: Kafka -> JDBCSink
> > > So Kafka and JDBC are probably the most commonly used. Of course,
> > >
> > > also
> > >
> > > includes CSV, JSON's formats.
> > > So when we provide such a fat distribution:
> > > - With CSV, JSON.
> > > - With flink-kafka-universal and kafka dependencies.
> > > - With flink-jdbc.
> > > Using this fat distribution, most users can run their jobs well.
> > >
> > > (jdbc
> > >
> > > driver jar required, but this is very natural to do)
> > > Can these dependencies lead to kinds of conflicts? Only Kafka may
> > >
> > > have
> > >
> > > conflicts, but if our goal is to use kafka-universal to support all
> > > Kafka
> > > versions, it is hopeful to target the vast majority of users.
> > >
> > > We don't want to plug all jars into the fat distribution. Only need
> > > less
> > > conflict and common. of course, it is a matter of consideration to
> > >
> > > put
> > >
> > > which jar into fat distribution.
> > > We have the opportunity to facilitate the majority of users, but
> > > also left
> > > opportunities for customization.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Apr 15, 2020 at 10:09 PM Jark Wu  <
> > imj...@gmail.com> wrote:
> > >
> > >
> > > Hi,
> > >
> > > I think we should firs

Re: [VOTE] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-16 Thread Jark Wu
+1 (binding)
Thanks Dawid for driving this.

Best,
Jark

On Thu, 16 Apr 2020 at 15:54, Dawid Wysakowicz 
wrote:

> Hi all,
>
> I would like to start the vote for FLIP-124 [1], which is discussed and
> reached a consensus in the discussion thread [2].
>
> The vote will be open until April 20th, unless there is an objection or
> not enough votes.
>
> Best,
>
> Dawid
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
>
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-124-Add-open-close-and-Collector-to-De-SerializationSchema-td39864.html
>


Re: [DISCUSS] Releasing Flink 1.9.3

2020-04-15 Thread Jark Wu
+1 for releasing 1.9.3 soon.
Thanks Dian for driving this!

Best,
Jark

On Wed, 15 Apr 2020 at 22:11, Congxian Qiu  wrote:

> +1 to create a new 1.9 bugfix release. and FLINK-16576[1] has merged into
> master, filed a pr for release-1.9 already
>
> [1] https://issues.apache.org/jira/browse/FLINK-16576
>
> Best,
> Congxian
>
>
> Yu Li  于2020年4月15日周三 下午9:16写道:
>
> > +1 to create a new 1.9 bug fix release. Thanks Dian for volunteering as
> our
> > RM.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Wed, 15 Apr 2020 at 16:25, Hequn Cheng  wrote:
> >
> > > +1 for the release and for Dian being the RM.
> > > Thanks Jincheng for your continuous efforts on helping the releasing.
> > >
> > > Best,
> > > Hequn
> > >
> > > On Wed, Apr 15, 2020 at 3:45 PM Till Rohrmann 
> > > wrote:
> > >
> > > > Hi Dian,
> > > >
> > > > creating a new 1.9 bug fix release is a very good idea. +1 for
> creating
> > > it
> > > > soon. Also thanks for volunteering as our release manager.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Apr 10, 2020 at 7:27 AM Dian Fu 
> wrote:
> > > >
> > > > > Hi Jincheng,
> > > > >
> > > > > Thanks a lot for offering help. It would be very helpful. Thanks
> > again!
> > > > >
> > > > > Regards,
> > > > > Dian
> > > > >
> > > > > > 在 2020年4月10日,下午12:46,jincheng sun  写道:
> > > > > >
> > > > > > Hi Dian,
> > > > > >
> > > > > > Thanks for bring up the discussion. I would like to give you a
> hand
> > > at
> > > > > the
> > > > > > last stage when the RC is finished.  :)
> > > > > >
> > > > > > Best,
> > > > > > Jincheng
> > > > > >
> > > > > >
> > > > > >
> > > > > > Dian Fu  于2020年4月10日周五 上午11:08写道:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> It has been more than two months since we released Flink 1.9.2.
> > > There
> > > > > are
> > > > > >> already 36 improvements/bugfixes in the release-1.9 branch.
> > > > Therefore, I
> > > > > >> propose to create the next bugfix release 1.9.3 for Flink 1.9.
> > > > > >>
> > > > > >> Most notable fixes are:
> > > > > >>
> > > > > >> - [FLINK-15085] HistoryServer dashboard config json out of sync
> > > > > >> - [FLINK-15575] Azure Filesystem Shades Wrong Package
> > > "httpcomponents"
> > > > > >> - [FLINK-15638] releasing/create_release_branch.sh does not set
> > > > version
> > > > > in
> > > > > >> flink-python/pyflink/version.py
> > > > > >> - [FLINK-16242] BinaryGeneric serialization error cause
> checkpoint
> > > > > failure
> > > > > >> - [FLINK-16573] Kinesis consumer does not properly shutdown
> > > > > RecordFetcher
> > > > > >> threads
> > > > > >> - [FLINK-16047] Blink planner produces wrong aggregate results
> > with
> > > > > state
> > > > > >> clean up
> > > > > >> - [FLINK-16860] TableException: Failed to push filter into table
> > > > source!
> > > > > >> when upgrading flink to 1.9.2
> > > > > >> - [FLINK-16916] The logic of NullableSerializer#copy is wrong
> > > > > >> - [FLINK-16389] Bump Kafka 0.10 to 0.10.2.2
> > > > > >> - [FLINK-15812] HistoryServer archiving is done in Dispatcher
> main
> > > > > thread
> > > > > >> - [FLINK-17062] Fix the conversion from Java row type to Python
> > row
> > > > type
> > > > > >>
> > > > > >> Furthermore, there is one blocker issue which should be merged
> > > before
> > > > > >> 1.9.3 release:
> > > > > >>
> > > > > >> - [FLINK-16576] State inconsistency on restore with memory state
> > > > > backends
> > > > > >> (reviewing)
> > > > > >>
> > > > > >> I would volunteer as the release manager and kick off the
> release
> > > > > process.
> > > > > >> What do you think?
> > > > > >>
> > > > > >> Please let me know if there are any concerns or any other
> blocker
> > > > issues
> > > > > >> need to be fixed in 1.9.3. Thanks.
> > > > > >>
> > > > > >> Appreciated if there is any PMC could help with the final steps
> of
> > > the
> > > > > >> release process.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Dian
> > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-17169) Refactor BaseRow to use RowKind instead of byte header

2020-04-15 Thread Jark Wu (Jira)
Jark Wu created FLINK-17169:
---

 Summary: Refactor BaseRow to use RowKind instead of byte header
 Key: FLINK-17169
 URL: https://issues.apache.org/jira/browse/FLINK-17169
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-04-15 Thread Jark Wu
Hi,

I think we should first reach an consensus on "what problem do we want to
solve?"
(1) improve first experience? or (2) improve production experience?

As far as I can see, with the above discussion, I think what we want to
solve is the "first experience".
And I think the slim jar is still the best distribution for production,
because it's easier to assembling jars
than excluding jars and can avoid potential class conflicts.

If we want to improve "first experience", I think it make sense to have a
fat distribution to give users a more smooth first experience.
But I would like to call it "playground distribution" or something like
that to explicitly differ from the "slim production-purpose distribution".
The "playground distribution" can contains some widely used jars, like
universal-kafka-sql-connector, elasticsearch7-sql-connector, avro, json,
csv, etc..
Even we can provide a playground docker which may contain the fat
distribution, python3, and hive.

Best,
Jark


On Wed, 15 Apr 2020 at 21:47, Chesnay Schepler  wrote:

> I don't see a lot of value in having multiple distributions.
>
> The simple reality is that no fat distribution we could provide would
> satisfy all use-cases, so why even try.
> If users commonly run into issues for certain jars, then maybe those
> should be added to the current distribution.
>
> Personally though I still believe we should only distribute a slim
> version. I'd rather have users always add required jars to the
> distribution than only when they go outside our "expected" use-cases.
> Then we might finally address this issue properly, i.e., tooling to
> assemble custom distributions and/or better error messages if
> Flink-provided extensions cannot be found.
>
> On 15/04/2020 15:23, Kurt Young wrote:
> > Regarding to the specific solution, I'm not sure about the "fat" and
> "slim"
> > solution though. I get the idea
> > that we can make the slim one even more lightweight than current
> > distribution, but what about the "fat"
> > one? Do you mean that we would package all connectors and formats into
> > this? I'm not sure if this is
> > feasible. For example, we can't put all versions of kafka and hive
> > connector jars into lib directory, and
> > we also might need hadoop jars when using filesystem connector to access
> > data from HDFS.
> >
> > So my guess would be we might hand-pick some of the most frequently used
> > connectors and formats
> > into our "lib" directory, like kafka, csv, json metioned above, and still
> > leave some other connectors out of it.
> > If this is the case, then why not we just provide this distribution to
> > user? I'm not sure i get the benefit of
> > providing another super "slim" jar (we have to pay some costs to provide
> > another suit of distribution).
> >
> > What do you think?
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Apr 15, 2020 at 7:08 PM Jingsong Li 
> wrote:
> >
> >> Big +1.
> >>
> >> I like "fat" and "slim".
> >>
> >> For csv and json, like Jark said, they are quite small and don't have
> other
> >> dependencies. They are important to kafka connector, and important
> >> to upcoming file system connector too.
> >> So can we move them to both "fat" and "slim"? They're so important, and
> >> they're so lightweight.
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Wed, Apr 15, 2020 at 4:53 PM godfrey he  wrote:
> >>
> >>> Big +1.
> >>> This will improve user experience (special for Flink new users).
> >>> We answered so many questions about "class not found".
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>> Dian Fu  于2020年4月15日周三 下午4:30写道:
> >>>
> >>>> +1 to this proposal.
> >>>>
> >>>> Missing connector jars is also a big problem for PyFlink users.
> >>> Currently,
> >>>> after a Python user has installed PyFlink using `pip`, he has to
> >> manually
> >>>> copy the connector fat jars to the PyFlink installation directory for
> >> the
> >>>> connectors to be used if he wants to run jobs locally. This process is
> >>> very
> >>>> confuse for users and affects the experience a lot.
> >>>>
> >>>> Regards,
> >>>> Dian
> >>>>
> >>>>> 在 2020年4月15日,下午3:51,Jark Wu  写道:
> >>>>>

Re: [DISCUSS] Releasing Flink 1.10.1

2020-04-15 Thread Jark Wu
y to cause problem.
> > >> >>>> > > > > >
> > >> >>>> > > > > > So basically only people have small 'process.size' in
> > >> custom
> > >> >>>> config
> > >> >>>> > > > file
> > >> >>>> > > > > > are really affected. I'm not sure what is the
> proportion
> > of
> > >> >>>> such
> > >> >>>> > use
> > >> >>>> > > > > cases
> > >> >>>> > > > > > though. (From questions asked on the user ML, probably
> > not
> > >> >>>> much).
> > >> >>>> > > > > >
> > >> >>>> > > > > > Thank you~
> > >> >>>> > > > > >
> > >> >>>> > > > > > Xintong Song
> > >> >>>> > > > > >
> > >> >>>> > > > > >
> > >> >>>> > > > > >
> > >> >>>> > > > > > On Thu, Mar 12, 2020 at 10:09 PM Stephan Ewen <
> > >> >>>> se...@apache.org>
> > >> >>>> > > > wrote:
> > >> >>>> > > > > >
> > >> >>>> > > > > > > No need to revert it now - I am not saying it should
> > not
> > >> go
> > >> >>>> into
> > >> >>>> > > > > 1.10.1,
> > >> >>>> > > > > > I
> > >> >>>> > > > > > > am just saying this is not clear to me yet.
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > We have to trade off the fact that we may break some
> > >> >>>> deployments
> > >> >>>> > > with
> > >> >>>> > > > > the
> > >> >>>> > > > > > > fact that we will "safe" a lot of new deployments.
> > >> >>>> > > > > > > I simply lack the data points / insight at the moment
> > to
> > >> >>>> > understand
> > >> >>>> > > > how
> > >> >>>> > > > > > > significant both cases are, meaning how many users
> > would
> > >> be
> > >> >>>> > > affected
> > >> >>>> > > > > and
> > >> >>>> > > > > > > how badly.
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > Independent of that, improving the error message is
> > >> always
> > >> >>>> > helpful.
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > On Thu, Mar 12, 2020 at 1:22 PM Andrey Zagrebin <
> > >> >>>> > > > > > > azagrebin.apa...@gmail.com>
> > >> >>>> > > > > > > wrote:
> > >> >>>> > > > > > >
> > >> >>>> > > > > > > > >   - For 1.10.1 I am not completely sure, because
> > >> users
> > >> >>>> expect
> > >> >>>> > > to
> > >> >>>> > > > > > > upgrade
> > >> >>>> > > > > > > > > that without config adjustments. That might not
> be
> > >> >>>> possible
> > >> >>>> > > with
> > >> >>>> > > > > that
> > >> >>>> > > > > > > > > change.
> > >> >>>> > > > > > > >
> > >> >>>> > > > > > > > Ok, makes sense, I will revert it for 1.10 and only
> > >> try to
> > >> >>>> > > improve
> > >> >>>> > > > > > error
> > >> >>>> > > > > > > > message and docs.
> > >> >>>> > > > > > > >
> > >> >>>> > > > > > > > > On 12 Mar 2020, at 13:15, Stephan Ewen <
&g

[jira] [Created] (FLINK-17157) TaskMailboxProcessorTest.testIdleTime failed on travis

2020-04-15 Thread Jark Wu (Jira)
Jark Wu created FLINK-17157:
---

 Summary: TaskMailboxProcessorTest.testIdleTime failed on travis
 Key: FLINK-17157
 URL: https://issues.apache.org/jira/browse/FLINK-17157
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Reporter: Jark Wu


CI: https://travis-ci.com/github/flink-ci/flink/jobs/318558536


{code}
[ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.102 
s <<< FAILURE! - in 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest
[ERROR] 
testIdleTime(org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest)
  Time elapsed: 0.032 s  <<< ERROR!
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.testIdleTime(TaskMailboxProcessorTest.java:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing "fat" and "slim" Flink distributions

2020-04-15 Thread Jark Wu
+1 to the proposal. I also found the "download additional jar" step is
really verbose when I prepare webinars.

At least, I think the flink-csv and flink-json should in the distribution,
they are quite small and don't have other dependencies.

Best,
Jark

On Wed, 15 Apr 2020 at 15:44, Jeff Zhang  wrote:

> Hi Aljoscha,
>
> Big +1 for the fat flink distribution, where do you plan to put these
> connectors ? opt or lib ?
>
> Aljoscha Krettek  于2020年4月15日周三 下午3:30写道:
>
> > Hi Everyone,
> >
> > I'd like to discuss about releasing a more full-featured Flink
> > distribution. The motivation is that there is friction for SQL/Table API
> > users that want to use Table connectors which are not there in the
> > current Flink Distribution. For these users the workflow is currently
> > roughly:
> >
> >   - download Flink dist
> >   - configure csv/Kafka/json connectors per configuration
> >   - run SQL client or program
> >   - decrypt error message and research the solution
> >   - download additional connector jars
> >   - program works correctly
> >
> > I realize that this can be made to work but if every SQL user has this
> > as their first experience that doesn't seem good to me.
> >
> > My proposal is to provide two versions of the Flink Distribution in the
> > future: "fat" and "slim" (names to be discussed):
> >
> >   - slim would be even trimmer than todays distribution
> >   - fat would contain a lot of convenience connectors (yet to be
> > determined which one)
> >
> > And yes, I realize that there are already more dimensions of Flink
> > releases (Scala version and Java version).
> >
> > For background, our current Flink dist has these in the opt directory:
> >
> >   - flink-azure-fs-hadoop-1.10.0.jar
> >   - flink-cep-scala_2.12-1.10.0.jar
> >   - flink-cep_2.12-1.10.0.jar
> >   - flink-gelly-scala_2.12-1.10.0.jar
> >   - flink-gelly_2.12-1.10.0.jar
> >   - flink-metrics-datadog-1.10.0.jar
> >   - flink-metrics-graphite-1.10.0.jar
> >   - flink-metrics-influxdb-1.10.0.jar
> >   - flink-metrics-prometheus-1.10.0.jar
> >   - flink-metrics-slf4j-1.10.0.jar
> >   - flink-metrics-statsd-1.10.0.jar
> >   - flink-oss-fs-hadoop-1.10.0.jar
> >   - flink-python_2.12-1.10.0.jar
> >   - flink-queryable-state-runtime_2.12-1.10.0.jar
> >   - flink-s3-fs-hadoop-1.10.0.jar
> >   - flink-s3-fs-presto-1.10.0.jar
> >   - flink-shaded-netty-tcnative-dynamic-2.0.25.Final-9.0.jar
> >   - flink-sql-client_2.12-1.10.0.jar
> >   - flink-state-processor-api_2.12-1.10.0.jar
> >   - flink-swift-fs-hadoop-1.10.0.jar
> >
> > Current Flink dist is 267M. If we removed everything from opt we would
> > go down to 126M. I would reccomend this, because the large majority of
> > the files in opt are probably unused.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>


[jira] [Created] (FLINK-17150) Introduce Canal format to support reading canal changelogs

2020-04-14 Thread Jark Wu (Jira)
Jark Wu created FLINK-17150:
---

 Summary: Introduce Canal format to support reading canal changelogs
 Key: FLINK-17150
 URL: https://issues.apache.org/jira/browse/FLINK-17150
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Ecosystem
Reporter: Jark Wu
 Fix For: 1.11.0


Introduce CanalFormatFactory and CanalRowDeserializationSchema to read 
[canal|https://github.com/alibaba/canal] changelogs.


{code:sql}
CREATE TABLE my_table (
  ...
) WITH (
 'connector'='...', -- e.g. 'kafka'
 'format'='canal-json',
 'format.ignore-parse-errors'='true' -- default false
);
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17149) Introduce Debezium format to support reading debezium changelogs

2020-04-14 Thread Jark Wu (Jira)
Jark Wu created FLINK-17149:
---

 Summary: Introduce Debezium format to support reading debezium 
changelogs
 Key: FLINK-17149
 URL: https://issues.apache.org/jira/browse/FLINK-17149
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Ecosystem
Reporter: Jark Wu
 Fix For: 1.11.0


Introduce {{DebeziumFormatFactory}} and {{DebeziumRowDeserializationSchema}} to 
read debezium changelogs.


{code:sql}
CREATE TABLE my_table (
  ...
) WITH (
  'connector'='...',  -- e.g. 'kafka'
  'format'='debezium-json',
  'format.schema-include'='true' -- default false, Debeizum can be configured 
to include or exclude the message schema
  'format.ignore-parse-errors'='true' -- default false
);
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT][VOTE] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-14 Thread Jark Wu
Hi all,

The voting time for FLIP-105 has passed. I'm closing the vote now.

There were 5 +1 votes, 3 of which are binding:

- Benchao (non-binding)
- Jark (binding)
- Jingsong Li (binding)
- zoudan (non-binding)
- Kurt (binding)

There were no disapproving votes.

Thus, FLIP-105 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Jark


Re: [VOTE] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-13 Thread Jark Wu
+1 (binding)

Best,
Jark

On Sun, 12 Apr 2020 at 09:24, Benchao Li  wrote:

> +1 (non-binding)
>
> Jark Wu  于2020年4月11日周六 上午11:31写道:
>
> > Hi all,
> >
> > I would like to start the vote for FLIP-105 [1], which is discussed and
> > reached a consensus in the discussion thread [2].
> >
> > The vote will be open for at least 72h, unless there is an objection or
> not
> > enough votes.
> >
> > Thanks,
> > Jark
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > [2]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-105-Support-to-Interpret-and-Emit-Changelog-in-Flink-SQL-td37665.html
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: [VOTE] FLIP-71: E2E View support in Flink SQL

2020-04-12 Thread Jark Wu
+1

Best,
Jark

On Sun, 12 Apr 2020 at 12:28, Benchao Li  wrote:

> +1 (non-binding)
>
> zoudan  于2020年4月12日周日 上午9:52写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Dan Zou
> >
> >
> > > 在 2020年4月10日,09:30,Danny Chan  写道:
> > >
> > > +1 from my side.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2020年4月9日 +0800 PM9:23,Timo Walther ,写道:
> > >> +1 (binding)
> > >>
> > >> Thanks for your efforts.
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 09.04.20 14:46, Zhenghua Gao wrote:
> > >>> Hi all,
> > >>>
> > >>> I'd like to start the vote for FLIP-71[1] which adds E2E view support
> > in
> > >>> Flink SQL.
> > >>> This FLIP is discussed in the thread[2].
> > >>>
> > >>> The vote will be open for at least 72 hours. Unless there is an
> > objection.
> > >>> I will try to
> > >>> close it by April 13, 2020 09:00 UTC if we have received sufficient
> > votes.
> > >>>
> > >>> [1]
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71%3A+E2E+View+support+in+FLINK+SQL
> > >>>
> > >>> [2]
> > >>>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-71-E2E-View-support-in-Flink-SQL-td33131.html#a39787
> > >>>
> > >>> *Best Regards,*
> > >>> *Zhenghua Gao*
> > >>>
> > >>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


[VOTE] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-10 Thread Jark Wu
Hi all,

I would like to start the vote for FLIP-105 [1], which is discussed and
reached a consensus in the discussion thread [2].

The vote will be open for at least 72h, unless there is an objection or not
enough votes.

Thanks,
Jark

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-105-Support-to-Interpret-and-Emit-Changelog-in-Flink-SQL-td37665.html


Re: [DISCUSS] FLIP-71 - E2E View support in Flink SQL

2020-04-10 Thread Jark Wu
Sorry for the late reply,

I have some concern around "Supporting SHOW VIEWS|DESCRIBE VIEW name".
Currently, in SQL CLI, the "SHOW TABLES" will also list views and "DESCRIBE
name" can also describe a view.
Shall we remove the view support in those commands if we want to support a
dedicate "SHOW VIEWS|DESCRIBE VIEW name"?

Brest,
Jark

On Wed, 8 Apr 2020 at 23:49, Timo Walther  wrote:

> I didn't know that. We should definitely implement this asap. Please
> open a JIRA issue.
>
> Thanks,
> Timo
>
>
> On 08.04.20 14:29, Zhenghua Gao wrote:
> > Hi Timo,
> >
> > Actually "TEMPORARY" is not supported in table DDL now.
> > But you are right I could support "CREATE TEMPORARY VIEW" in this FLIP.
> > And may be we should open a separate JIRA ticket to track supporting it
> in
> > table DDL?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Wed, Apr 8, 2020 at 7:48 PM Timo Walther  wrote:
> >
> >> Hi Zhenghua,
> >>
> >> FLINK-10232 is quite old and a lot of stuff was discussed and agreed on
> >> since then. I don't like to postpone the 'TEMPORARY' keyword because it
> >> is a important concept that is already part of the Table API (see
> >> TableEnvironment.createTemporaryView) and in function DDL and table DDL.
> >> It is not complicated to supported it in this FLIP and just a couple of
> >> line of code more.
> >>
> >> Regards,
> >> Timo
> >>
> >> On 08.04.20 11:27, Zhenghua Gao wrote:
> >>> Another concern about "CREATE DDL" is:
> >>>
> >>> FLINK-10232 proposes using "IF NOT EXISTS" to control the behavior
> when a
> >>> view or table with the same name already exists.
> >>> And "OR REPLACE" for type/library/function DDL.
> >>>
> >>> @godfrey he  I will keep the "IF NOT EXISTS"
> syntax
> >>> and postpone the "OR REPLACE" syntax until we need it.
> >>>
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Wed, Apr 8, 2020 at 4:46 PM Zhenghua Gao  wrote:
> >>>
>  Hi Timo,
> 
>  Shall we postpone the support of 'TEMPORARY' keyword since it's not
>  mentioned in FLINK-10232?
>  
> 
>  *Best Regards,*
>  *Zhenghua Gao*
> 
> 
>  On Wed, Apr 8, 2020 at 3:30 PM Timo Walther 
> wrote:
> 
> > Hi Zhenghua,
> >
> > VIEWS should also support the TEMPORARY keyword according to FLIP-64.
> >
> > Otherwise the FLIP looks good to me.
> >
> > Regards,
> > Timo
> >
> >
> > On 08.04.20 07:31, Zhenghua Gao wrote:
> >> @Danny Chan   you‘re right. I have updated
> the
> > doc.
> >>
> >> *Best Regards,*
> >> *Zhenghua Gao*
> >>
> >>
> >> On Wed, Apr 8, 2020 at 1:20 PM Danny Chan 
> >> wrote:
> >>
> >>> +1 for the proposal, a small concern for drop view statement:
> >>>
> >>> dropViewStatement:
> >>>  DROP VIEW name [ IF EXISTS ]
> >>> I think the drop statement should be
> >>> DROP VIEW [ IF EXISTS ] name
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年4月8日 +0800 AM11:54,Kurt Young ,写道:
>  This FLIP seems to be quite straightforward, +1 from my side.
> 
>  Best,
>  Kurt
> 
> 
>  On Tue, Apr 7, 2020 at 8:42 PM Zhenghua Gao 
> >> wrote:
> 
> > forward the reply to ML too.
> >
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > -- Forwarded message -
> > From: Zhenghua Gao 
> > Date: Tue, Apr 7, 2020 at 8:40 PM
> > Subject: Re: [DISCUSS] FLIP-71 - E2E View support in Flink SQL
> > To: godfrey he 
> >
> >
> >>> regarding to "Interoperability between Flink and Hive is not
> > guaranteed", can you explain this more?
> > We have several limitations of interoperability between flink
> >> objects
> >>> and
> > hive objects (tables, functions, etc).
> > So we don't promise the interoperability of views between flink
> and
> >>> hive
> > since a view is defined base on these objects.
> >
> >>> "CREATE VIEW [ IF NOT EXISTS ]"
> > This should be "CREATE VIEW [OR REPLACE]".
> >
> >>> "DESC"
> > It's a shortcut of "DESCRIBE" in SQL Client (See desc table xxx).
> > In DDL, we should only support "SHOW VIEWS" and "DESCRIBE VIEW
> >> xxx".
> >
> > I have updated the design doc, thanks.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Tue, Apr 7, 2020 at 8:09 PM godfrey he 
> > wrote:
> >
> >> Hi Zhenghua,
> >>
> >> Thanks for driving this. It's one step forward that
> >> TableEnvironment
> >> supports more complete SQLs.
> >> I have a few minor questions:
> >> 1. regarding to "Interoperability between Flink and Hive is not
> >> guaranteed", can you explain this more?

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-10 Thread Jark Wu
Hi Xiaogang,

I think this proposal doesn't conflict with your use case, you can still
chain a ProcessFunction after a source which emits raw data.
But I'm not in favor of chaining ProcessFunction after source, and we
should avoid that, because:

1) For correctness, it is necessary to perform the watermark generation as
early as possible in order to be close to the actual data
 generation within a source's data partition. This is also the purpose of
per-partition watermark and event-time alignment.
 Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
Deseriazing records and generating watermark in chained
 ProcessFunction makes it difficult to do per-partition watermark in the
future.
2) In Flink SQL, a source should emit the deserialized row instead of raw
data. Otherwise, users have to define raw byte[] as the
 single column of the defined table, and parse them in queries, which is
very inconvenient.

Best,
Jark

On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang  wrote:

> Hi,
>
> I don't think the proposal is a good solution to the problems. I am in
> favour of using a ProcessFunction chained to the source/sink function to
> serialize/deserialize the records, instead of embedding (de)serialization
> schema in source/sink function.
>
> Message packing is heavily used in our production environment to allow
> compression and improve throughput. As buffered messages have to be
> delivered when the time exceeds the limit, timers are also required in our
> cases. I think it's also a common need for other users.
>
> In the this proposal, with more components added into the context, in the
> end we will find the serialization/deserialization schema is just another
> wrapper of ProcessFunction.
>
> Regards,
> Xiaogang
>
> Aljoscha Krettek  于2020年4月7日周二 下午6:34写道:
>
> > On 07.04.20 08:45, Dawid Wysakowicz wrote:
> >
> > > @Jark I was aware of the implementation of SinkFunction, but it was a
> > > conscious choice to not do it that way.
> > >
> > > Personally I am against giving a default implementation to both the new
> > > and old methods. This results in an interface that by default does
> > > nothing or notifies the user only in the runtime, that he/she has not
> > > implemented a method of the interface, which does not sound like a good
> > > practice to me. Moreover I believe the method without a Collector will
> > > still be the preferred method by many users. Plus it communicates
> > > explicitly what is the minimal functionality required by the interface.
> > > Nevertheless I am happy to hear other opinions.
> >
> > Dawid and I discussed this before. I did the extension of the
> > SinkFunction but by now I think it's better to do it this way, because
> > otherwise you can implement the interface without implementing any
> methods.
> >
> > > @all I also prefer the buffering approach. Let's wait a day or two more
> > > to see if others think differently.
> >
> > I'm also in favour of buffering outside the lock.
> >
> > Also, +1 to this FLIP.
> >
> > Best,
> > Aljoscha
> >
>


Re: [VOTE] FLIP-113: Supports Dynamic Table Options for Flink SQL

2020-04-10 Thread Jark Wu
+1 from my side (binding)

Best,
Jark

On Fri, 10 Apr 2020 at 17:03, Timo Walther  wrote:

> +1 (binding)
>
> Thanks for the healthy discussion. I think this feature can be useful
> during the development of a pipeline.
>
> Regards,
> Timo
>
> On 10.04.20 03:34, Danny Chan wrote:
> > Hi all,
> >
> > I would like to start the vote for FLIP-113 [1], which is discussed and
> > reached a consensus in the discussion thread [2].
> >
> > The vote will be open until April 13nd (72h), unless there is an
> > objection or not enough votes.
> >
> > Best,
> > Danny Chan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
> > [2]
> >
> https://lists.apache.org/thread.html/r94af5d3d97e76e7dd9df68cb0becc7ba74d15591a8fab84c72fa%40%3Cdev.flink.apache.org%3E
> >
>
>


Re: [DISCUSS] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-10 Thread Jark Wu
Hi,

After a short offline discussion with Kurt, It seems that I misunderstood
Kurt's meaning.
Kurt meant: is `format=debezium` is enough, or split into two options
`format=debezium` and `format.encoding=json`.

Debezium not only support JSON encoding, but also Avro. Canal supports JSON
and Protobuf. So a single `format=debezium` is not enough (in the long
term).
The reason I proposed a single option `format=debezium-json` instead of two:
 - It's simpler to write a single option instead of two, we also make this
design decision for "connector" and "version".
 - I didn't find a good name for separate option keys, because JSON is also
a format, not an encoding, but `format.format=json` is weird.

Hi everyone,

If there are no further concerns, I would like to start a voting thread by
tomorrow.

Best,
Jark



On Wed, 8 Apr 2020 at 15:37, Jark Wu  wrote:

> Hi Kurt,
>
> The JSON encoding of Debezium can be configured to include or exclude the
> message schema using the `value.converter.schemas.enable` properties.
> That's why we propose to have a `format.schema-include` property key to
> config how to parse the json.
>
> Besides, the encoding format of debezium is stable and unified across
> different databases (MySQL, Oracle, SQL Server, DB2, PostgresSQL).
> However, because of the limitation of some special databases, some
> databases CDC encoding are different (Cassandra and MongoDB).
> If we want to support them in the future, we can introduce an optional
> property key, e.g. `format.encoding-connector=mongodb`, to recognize this
> special encoding.
>
> Canal currently only support to capture changes from MySQL, so there is
> only one encoding in Canal. But both Canal and Debezium may evolve their
> encoding in the future.
> We can also introduce a `format.encoding-version` in the future if needed.
>
> Best,
> Jark
>
>
> On Wed, 8 Apr 2020 at 14:26, Kurt Young  wrote:
>
>> One minor comment, is there any other encoding or format in debezium? I'm
>> asking because the format
>> name is debezium-json, i'm wondering whether debezium is enough. This also
>> applies to canal.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Apr 7, 2020 at 11:49 AM Jark Wu  wrote:
>>
>> > Hi everyone,
>> >
>> > Since this FLIP was proposed, the community has discussed a lot about
>> the
>> > first approach: introducing new TableSource and TableSink interfaces to
>> > support changelog.
>> > And yes, that is FLIP-95 which has been accepted last week. So most of
>> the
>> > work has been merged into FLIP-95.
>> >
>> > In order to support the goal of FLIP-105, there is still a little
>> things to
>> > discuss: how to connect external CDC formats.
>> > We propose to introduce 2 new formats: Debezium format and Canal format.
>> > They are the most popular CDC tools according to the survey in user [1]
>> and
>> > user-zh [2] mailing list.
>> >
>> > I have updated the FLIP:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
>> >
>> > Welcome feedbacks!
>> >
>> > Best,
>> > Jark
>> >
>> > [1]:
>> >
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SURVEY-What-Change-Data-Capture-tools-are-you-using-td33569.html
>> > [2]: http://apache-flink.147419.n8.nabble.com/SURVEY-CDC-td1910.html
>> >
>> >
>> > On Fri, 14 Feb 2020 at 22:08, Jark Wu  wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > I would like to start discussion about how to support interpreting
>> > > external changelog into Flink SQL, and how to emit changelog from
>> Flink
>> > SQL.
>> > >
>> > > This topic has already been mentioned several times in the past. CDC
>> > > (Change Data Capture) data has been a very important streaming data in
>> > the
>> > > world. Connect to CDC is a significant feature for Flink, it fills the
>> > > missing piece for Flink's streaming processing.
>> > >
>> > > In FLIP-105, we propose 2 approaches to achieve.
>> > > One is introducing new TableSource interface (higher priority),
>> > > the other is introducing new SQL syntax to interpret and emit
>> changelog.
>> > >
>> > > FLIP-105:
>> > >
>> >
>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>> > >
>> > > Thanks for any feedback!
>> > >
>> > > Best,
>> > > Jark
>> > >
>> >
>>
>


Re: Configuring autolinks to Flink JIRA ticket in github repos

2020-04-09 Thread Jark Wu
Thanks Yun,

This's a great feature! I was surprised by the autolink feature yesterday
(didn't know your work at that time).

Best,
Jark

On Thu, 9 Apr 2020 at 16:12, Yun Tang  wrote:

> Hi community
>
> The autolink to Flink JIRA ticket has taken effect. You could refer to the
> commit details page[1] to see all Flink JIRA titles within commits has the
> hyper link underline. Moreover, you don't need to use markdown language to
> create hyper link to Flink JIRA ticket when discussing in the pull
> requests. e.g FLINK-16850 could point to the link instead of [FLINK-16850](
> https://issues.apache.org/jira/browse/FLINK-16850)
>
>
> [1] https://github.com/apache/flink/commits/master
>
> Best
> Yun Tang
>
> 
> From: Till Rohrmann 
> Sent: Thursday, April 2, 2020 23:11
> To: dev 
> Subject: Re: Configuring autolinks to Flink JIRA ticket in github repos
>
> Nice, this is a cool feature. Thanks for asking INFRA for it.
>
> Cheers,
> Till
>
> On Wed, Apr 1, 2020 at 6:52 PM Yun Tang  wrote:
>
> > Hi community.
> >
> > I noticed that Github supports autolink reference recently [1]. This is
> > helpful to allow developers could open Jira ticket link from pull
> requests
> > title directly when accessing github repo.
> >
> > I have already created INFRA-20055 [2] to ask for configuration for seven
> > Flink related github repos. Hope it could be resolved soon 
> >
> >
> > [1]
> >
> https://help.github.com/en/github/administering-a-repository/configuring-autolinks-to-reference-external-resources
> > [2] https://issues.apache.org/jira/browse/INFRA-20055
> >
> > Best
> > Yun Tang
> >
>


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-04-08 Thread Jark Wu
`table.dynamic-table-options.enabled` and `TableConfigOptions` sounds good
to me.

Best,
Jark

On Wed, 8 Apr 2020 at 18:59, Danny Chan  wrote:

> `table.dynamic-table-options.enabled` seems fine to me, I would make a new
> `TableConfigOptions` class and put the config option there ~
>
> What do you think about the new class to put ?
>
> Best,
> Danny Chan
> 在 2020年4月8日 +0800 PM5:33,dev@flink.apache.org,写道:
> >
> > `table.dynamic-table-options.enabled`
>


Re: [DISCUSS] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-08 Thread Jark Wu
Hi Kurt,

The JSON encoding of Debezium can be configured to include or exclude the
message schema using the `value.converter.schemas.enable` properties.
That's why we propose to have a `format.schema-include` property key to
config how to parse the json.

Besides, the encoding format of debezium is stable and unified across
different databases (MySQL, Oracle, SQL Server, DB2, PostgresSQL).
However, because of the limitation of some special databases, some
databases CDC encoding are different (Cassandra and MongoDB).
If we want to support them in the future, we can introduce an optional
property key, e.g. `format.encoding-connector=mongodb`, to recognize this
special encoding.

Canal currently only support to capture changes from MySQL, so there is
only one encoding in Canal. But both Canal and Debezium may evolve their
encoding in the future.
We can also introduce a `format.encoding-version` in the future if needed.

Best,
Jark


On Wed, 8 Apr 2020 at 14:26, Kurt Young  wrote:

> One minor comment, is there any other encoding or format in debezium? I'm
> asking because the format
> name is debezium-json, i'm wondering whether debezium is enough. This also
> applies to canal.
>
> Best,
> Kurt
>
>
> On Tue, Apr 7, 2020 at 11:49 AM Jark Wu  wrote:
>
> > Hi everyone,
> >
> > Since this FLIP was proposed, the community has discussed a lot about the
> > first approach: introducing new TableSource and TableSink interfaces to
> > support changelog.
> > And yes, that is FLIP-95 which has been accepted last week. So most of
> the
> > work has been merged into FLIP-95.
> >
> > In order to support the goal of FLIP-105, there is still a little things
> to
> > discuss: how to connect external CDC formats.
> > We propose to introduce 2 new formats: Debezium format and Canal format.
> > They are the most popular CDC tools according to the survey in user [1]
> and
> > user-zh [2] mailing list.
> >
> > I have updated the FLIP:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> >
> > Welcome feedbacks!
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SURVEY-What-Change-Data-Capture-tools-are-you-using-td33569.html
> > [2]: http://apache-flink.147419.n8.nabble.com/SURVEY-CDC-td1910.html
> >
> >
> > On Fri, 14 Feb 2020 at 22:08, Jark Wu  wrote:
> >
> > > Hi everyone,
> > >
> > > I would like to start discussion about how to support interpreting
> > > external changelog into Flink SQL, and how to emit changelog from Flink
> > SQL.
> > >
> > > This topic has already been mentioned several times in the past. CDC
> > > (Change Data Capture) data has been a very important streaming data in
> > the
> > > world. Connect to CDC is a significant feature for Flink, it fills the
> > > missing piece for Flink's streaming processing.
> > >
> > > In FLIP-105, we propose 2 approaches to achieve.
> > > One is introducing new TableSource interface (higher priority),
> > > the other is introducing new SQL syntax to interpret and emit
> changelog.
> > >
> > > FLIP-105:
> > >
> >
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
> > >
> > > Thanks for any feedback!
> > >
> > > Best,
> > > Jark
> > >
> >
>


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-04-07 Thread Jark Wu
Thanks for the summary Danny. +1 to the new proposal.

I have a minor concern about the global configuration
`table.optimizer.dynamic-table-options.enabled`, does it belong to
optimizer?
>From my point of view, it is just an API to set table options and uses
Calcite in the implementation.
I'm also thinking about what's the name of other configurations, e.g
time-zone, code-gen length, state ttl.
Should they prefix with "optimizer" or "exec" or something else or nothing?

Best,
Jark

On Tue, 7 Apr 2020 at 23:17, Timo Walther  wrote:

> Thanks for the update Danny. +1 from my side.
>
> Regards,
> Timo
>
>
> On 07.04.20 13:25, Danny Chan wrote:
> > Hi, every ~
> >
> > It seems that we all agree to drop the idea for white/black list for each
> > connector, and have a global config option to default disable this
> feature.
> >
> > I have also discussed with Timo and Jark about the interface
> > TableSourceTable.Context.getExecutionOptions and finally we decide to
> > introduce a new interface CatalogTable#copy(Map) to
> support
> > re-generate the table with new table options.
> >
> > So let me summarize the current design broadly again:
> >
> > - Use the syntax /*+ OPTIONS('k1' = 'v1', 'k2' = 'v2') to describe
> the
> > dynamic table options
> > - There is no constraint on which option key can be used in the
> OPTIONS,
> > that means, any option key is allowed, the factory would to the
> validation
> > work finally
> > - Introduce method CatalogTable#copy, we use this method to
> regenerate a
> > new CatalogTable to find a table factory and creates table
> source/sink
> > - There is a global config option to default disable this feature (if
> > user uses OPTIONS, an exception throws to tell open the option)
> >
> > I have updated the WIKI
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
> >,
> > look forward to your suggestions ~
> >
> > Jark Wu  于2020年4月7日周二 上午11:24写道:
> >
> >> I'm fine to disable this feature by default and avoid
> >> whitelisting/blacklisting. This simplifies a lot of things.
> >>
> >> Regarding to TableSourceFactory#Context#getExecutionOptions, do we
> really
> >> need this interface?
> >> Should the connector factory be aware of the properties is merged with
> >> hints or not?
> >> What's the problem if we always get properties from
> >> `CatalogTable#getProperties`?
> >>
> >> Best,
> >> Jark
> >>
> >> On Tue, 7 Apr 2020 at 10:39, Kurt Young  wrote:
> >>
> >>> Sounds like a reasonable compromise, disabling this feature by default
> >> is a
> >>> way to protect
> >>> the vulnerability, and we can simplify the design quite a lot. We can
> >>> gather some users'
> >>> feedback to see whether further protections are necessary in the
> future.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Mon, Apr 6, 2020 at 11:49 PM Timo Walther 
> wrote:
> >>>
> >>>> I agree with Aljoscha. The length of this thread shows that this is
> >>>> highly controversal. I think nobody really likes this feature 100% but
> >>>> we could not find a better solution. I would consider it as a
> >>>> nice-to-have improvement during a notebook/debugging session.
> >>>>
> >>>> I would accept avoiding whitelisting/blacklisting if the feature is
> >>>> disabled by default. And we make the merged properties available in a
> >>>> separate TableSourceFactory#Context#getExecutionOptions as Danny
> >>> proposed.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Thanks,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 06.04.20 09:59, Aljoscha Krettek wrote:
> >>>>> The reason I'm saying it should be disabled by default is that this
> >>> uses
> >>>>> hint syntax, and hints should really not change query semantics.
> >>>>>
> >>>>> I'm quite strongly against hints that change query semantics, but if
> >> we
> >>>>> disable this by default I would be (reluctantly) OK with the feature.
> >>>>> Companies that create deployments or set up the SQL environment for
> >>>>> users can enable the feature if they want.
>

[jira] [Created] (FLINK-17028) Introduce a new HBase connector with new property keys

2020-04-07 Thread Jark Wu (Jira)
Jark Wu created FLINK-17028:
---

 Summary: Introduce a new HBase connector with new property keys
 Key: FLINK-17028
 URL: https://issues.apache.org/jira/browse/FLINK-17028
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / HBase, Table SQL / Ecosystem
Reporter: Jark Wu


This new Kafka connector should use new interfaces proposed by FLIP-95, e.g. 
DynamicTableSource, DynamicTableSink, and Factory.

The new proposed keys :
||Old key||New key||Note||
|connector.type|connector| |
|connector.version|N/A|merged into 'connector' key|
|connector.table-name|table-name| |
|connector.zookeeper.quorum|zookeeper.quorum| |
|connector.zookeeper.znode.parent|zookeeper.znode-parent| |
|connector.write.buffer-flush.max-size|sink.buffer-flush.max-size| |
|connector.write.buffer-flush.max-rows|sink.buffer-flush.max-rows| |
|connector.write.buffer-flush.interval|sink.buffer-flush.interval| |
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17027) Introduce a new Elasticsearch connector with new property keys

2020-04-07 Thread Jark Wu (Jira)
Jark Wu created FLINK-17027:
---

 Summary: Introduce a new Elasticsearch connector with new property 
keys
 Key: FLINK-17027
 URL: https://issues.apache.org/jira/browse/FLINK-17027
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / ElasticSearch, Table SQL / Ecosystem
Reporter: Jark Wu
 Fix For: 1.11.0


This new Elasticsearch connector should use new interfaces proposed by FLIP-95, 
e.g. DynamicTableSource, DynamicTableSink, and Factory.

The new proposed keys :
||Old key||New key||Note||
|connector.type|connector| |
|connector.version|N/A|merged into 'connector' key|
|connector.hosts|hosts| |
|connector.index|index| |
|connector.document-type|document-type| |
|connector.failure-handler|failure-handler| |
|connector.connection-max-retry-timeout|connection.max-retry-timeout| |
|connector.connection-path-prefix|connection.path-prefix| |
|connector.key-delimiter|document-id.key-delimiter|They can also be used by 
sources in the future. In addition, we prefix 'document-id' to make the meaning 
more understandable. |
|connector.key-null-literal|document-id.key-null-literal|
|connector.flush-on-checkpoint|sink.flush-on-checkpoint| |
|connector.bulk-flush.max-actions|sink.bulk-flush.max-actions|we still use 
bulk-flush, because it's a elasticsearch terminology.|
|connector.bulk-flush.max-size|sink.bulk-flush.max-size| |
|connector.bulk-flush.interval|sink.bulk-flush.interval| |
|connector.bulk-flush.back-off.type|sink.bulk-flush.back-off.strategy| |
|connector.bulk-flush.back-off.max-retries|sink.bulk-flush.back-off.max-retries|
 |
|connector.bulk-flush.back-off.delay|sink.bulk-flush.back-off.delay| |
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17026) Introduce a new Kafka connector with new property keys

2020-04-07 Thread Jark Wu (Jira)
Jark Wu created FLINK-17026:
---

 Summary: Introduce a new Kafka connector with new property keys
 Key: FLINK-17026
 URL: https://issues.apache.org/jira/browse/FLINK-17026
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka, Table SQL / Ecosystem
Reporter: Jark Wu
 Fix For: 1.11.0


This new Kafka connector should use new interfaces proposed by FLIP-95, e.g. 
DynamicTableSource, DynamicTableSink, and Factory.

The new proposed keys :
||Old key||New key||Note||
|connector.type|connector| |
|connector.version|N/A|merged into 'connector' key|
|connector.topic|topic| |
|connector.properties.zookeeper.connect|properties.zookeeper.connect| |
|connector.properties.bootstrap.servers|properties.bootstrap.servers| |
|connector.properties.group.id|properties.group.id| |
|connector.startup-mode|scan.startup.mode| |
|connector.specific-offsets|scan.startup.specific-offsets| |
|connector.startup-timestamp-millis|scan.startup.timestamp-millis| |
|connector.sink-partitioner|sink.partitioner|"fixed", or "round-robin", or a 
class name "org.mycompany.MyPartitioner"|
|connector.sink-partitioner-class|N/A|merged into 'sink.partitioner', not 
needed anymore|
|format.type|format| |

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT][VOTE] FLIP-122: New Connector Property Keys for New Factory

2020-04-07 Thread Jark Wu
Hi all,

The voting time for FLIP-122 has passed. I'm closing the vote now.

There were 8 +1 votes, 4 of which are binding:

- Timo (binding)
- Dawid (binding)
- Benchao Li (non-binding)
- Jingsong Li (binding)
- LakeShen (non-binding)
- Leonard Xu (non-binding)
- zoudan (non-binding)
- Jark (binding)

There were no disapproving votes.

Thus, FLIP-122 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Jark


Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-07 Thread Jark Wu
Thanks for the explanation. Sounds good to me.

Best,
Jark

On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz 
wrote:

> Hi all,
>
> @Timo I'm fine with OpenContext.
>
> @Timo @Seth Sure we can combine all the parameters in a single object.
> Will update the FLIP
>
> @Jark I was aware of the implementation of SinkFunction, but it was a
> conscious choice to not do it that way.
>
> Personally I am against giving a default implementation to both the new
> and old methods. This results in an interface that by default does
> nothing or notifies the user only in the runtime, that he/she has not
> implemented a method of the interface, which does not sound like a good
> practice to me. Moreover I believe the method without a Collector will
> still be the preferred method by many users. Plus it communicates
> explicitly what is the minimal functionality required by the interface.
> Nevertheless I am happy to hear other opinions.
>
> @all I also prefer the buffering approach. Let's wait a day or two more
> to see if others think differently.
>
> Best,
>
> Dawid
>
> On 07/04/2020 06:11, Jark Wu wrote:
> > Hi Dawid,
> >
> > Thanks for driving this. This is a blocker to support Debezium CDC format
> > (FLIP-105). So big +1 from my side.
> >
> > Regarding to emitting multiple records and checkpointing, I'm also in
> favor
> > of option#1: buffer all the records outside of the checkpoint lock.
> > I think most of the use cases will not buffer larger data than
> > it's deserialized byte[].
> >
> > I have a minor suggestion on DeserializationSchema: could we have a
> default
> > implementation (maybe throw exception) for `T deserialize(byte[]
> message)`?
> > I think this will not break compatibility, and users don't have to
> > implement this deprecated interface if he/she wants to use the new
> > collector interface.
> > I think SinkFunction also did this in the same way: introduce a new
> invoke
> > method with Context parameter, and give the old invoke method an
> > empty implemention.
> >
> > Best,
> > Jark
> >
> > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman  wrote:
> >
> >> I would be in favor of buffering data outside of the checkpoint lock.
> In my
> >> experience, serialization is always the biggest performance killer in
> user
> >> code and I have a hard time believing in practice that anyone is going
> to
> >> buffer so many records that is causes real memory concerns.
> >>
> >> To add to Timo's point,
> >>
> >> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
> >>
> >> Seth
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> >> [2]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
> >>
> >>
> >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther  wrote:
> >>
> >>> Hi Dawid,
> >>>
> >>> thanks for this FLIP. This solves a lot of issues with the current
> >>> design for both the Flink contributors and users. +1 for this.
> >>>
> >>> Some minor suggestions from my side:
> >>> - How about finding something shorter for `InitializationContext`?
> Maybe
> >>> just `OpenContext`?
> >>> - While introducing default methods for existing interfaces, shall we
> >>> also create contexts for those methods? I see the following method in
> >>> your FLIP and wonder if we can reduce the number of parameters while
> >>> introducing a new method:
> >>>
> >>> deserialize(
> >>>  byte[] recordValue,
> >>>  String partitionKey,
> >>>  String seqNum,
> >>>  long approxArrivalTimestamp,
> >>>  String stream,
> >>>  String shardId,
> >>>  Collector out)
> >>>
> >>> to:
> >>>
> >>> deserialize(
> >>>  byte[] recordValue,
> >>>  Context c,
> >>>  Collector out)
> >>>
> >>> What do you think?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>>
> >>> On 06.04.20 11:08, Dawid Wysakowi

[jira] [Created] (FLINK-17015) Fix NPE from NullAwareMapIterator

2020-04-07 Thread Jark Wu (Jira)
Jark Wu created FLINK-17015:
---

 Summary: Fix NPE from NullAwareMapIterator
 Key: FLINK-17015
 URL: https://issues.apache.org/jira/browse/FLINK-17015
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jark Wu
 Attachments: 92164295_3052056384855585_3776552648744894464_o.jpg

When using Heap statebackend, the underlying 
{{org.apache.flink.runtime.state.heap.HeapMapState#iterator}} may return a null 
iterator. It results in the {{NullAwareMapIterator}} holds a null iterator and 
throws NPE in the following {{NullAwareMapIterator#hasNext}} invocking. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Jark Wu
Hi Dawid,

Thanks for driving this. This is a blocker to support Debezium CDC format
(FLIP-105). So big +1 from my side.

Regarding to emitting multiple records and checkpointing, I'm also in favor
of option#1: buffer all the records outside of the checkpoint lock.
I think most of the use cases will not buffer larger data than
it's deserialized byte[].

I have a minor suggestion on DeserializationSchema: could we have a default
implementation (maybe throw exception) for `T deserialize(byte[] message)`?
I think this will not break compatibility, and users don't have to
implement this deprecated interface if he/she wants to use the new
collector interface.
I think SinkFunction also did this in the same way: introduce a new invoke
method with Context parameter, and give the old invoke method an
empty implemention.

Best,
Jark

On Mon, 6 Apr 2020 at 23:51, Seth Wiesman  wrote:

> I would be in favor of buffering data outside of the checkpoint lock. In my
> experience, serialization is always the biggest performance killer in user
> code and I have a hard time believing in practice that anyone is going to
> buffer so many records that is causes real memory concerns.
>
> To add to Timo's point,
>
> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
>
> Seth
>
> [1]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> [2]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
>
>
> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther  wrote:
>
> > Hi Dawid,
> >
> > thanks for this FLIP. This solves a lot of issues with the current
> > design for both the Flink contributors and users. +1 for this.
> >
> > Some minor suggestions from my side:
> > - How about finding something shorter for `InitializationContext`? Maybe
> > just `OpenContext`?
> > - While introducing default methods for existing interfaces, shall we
> > also create contexts for those methods? I see the following method in
> > your FLIP and wonder if we can reduce the number of parameters while
> > introducing a new method:
> >
> > deserialize(
> >  byte[] recordValue,
> >  String partitionKey,
> >  String seqNum,
> >  long approxArrivalTimestamp,
> >  String stream,
> >  String shardId,
> >  Collector out)
> >
> > to:
> >
> > deserialize(
> >  byte[] recordValue,
> >  Context c,
> >  Collector out)
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > > Hi devs,
> > >
> > > When working on improving the Table API/SQL connectors we faced a few
> > > shortcomings of the DeserializationSchema and SerializationSchema
> > > interfaces. Similar features were also mentioned by other users in the
> > > past. The shortcomings I would like to address with the FLIP include:
> > >
> > >   * Emitting 0 to m records from the deserialization schema with per
> > > partition watermarks
> > >   o
> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> > >   o differentiate null value from no value
> > >   o support for Debezium CDC format
> > > (
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > )
> > >
> > >   * A way to initialize the schema
> > >   o establish external connections
> > >   o generate code on startup
> > >   o no need for lazy initialization
> > >
> > >   * Access to metrics
> > > [
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> > ]
> > >
> > > One important aspect I would like to hear your opinion on is how to
> > > support the Collector interface in Kafka source. Of course if we agree
> > > to add the Collector to the DeserializationSchema.
> > >
> > > The FLIP can be found here:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988=contextnavpagetreemode
> > >
> > > Looking forward to your feedback.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> >
> >
>


Re: [DISCUSS] FLIP-105: Support to Interpret and Emit Changelog in Flink SQL

2020-04-06 Thread Jark Wu
Hi everyone,

Since this FLIP was proposed, the community has discussed a lot about the
first approach: introducing new TableSource and TableSink interfaces to
support changelog.
And yes, that is FLIP-95 which has been accepted last week. So most of the
work has been merged into FLIP-95.

In order to support the goal of FLIP-105, there is still a little things to
discuss: how to connect external CDC formats.
We propose to introduce 2 new formats: Debezium format and Canal format.
They are the most popular CDC tools according to the survey in user [1] and
user-zh [2] mailing list.

I have updated the FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL

Welcome feedbacks!

Best,
Jark

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SURVEY-What-Change-Data-Capture-tools-are-you-using-td33569.html
[2]: http://apache-flink.147419.n8.nabble.com/SURVEY-CDC-td1910.html


On Fri, 14 Feb 2020 at 22:08, Jark Wu  wrote:

> Hi everyone,
>
> I would like to start discussion about how to support interpreting
> external changelog into Flink SQL, and how to emit changelog from Flink SQL.
>
> This topic has already been mentioned several times in the past. CDC
> (Change Data Capture) data has been a very important streaming data in the
> world. Connect to CDC is a significant feature for Flink, it fills the
> missing piece for Flink's streaming processing.
>
> In FLIP-105, we propose 2 approaches to achieve.
> One is introducing new TableSource interface (higher priority),
> the other is introducing new SQL syntax to interpret and emit changelog.
>
> FLIP-105:
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>
> Thanks for any feedback!
>
> Best,
> Jark
>


Re: [DISCUSS]FLIP-113: Support SQL and planner hints

2020-04-06 Thread Jark Wu
I'm fine to disable this feature by default and avoid
whitelisting/blacklisting. This simplifies a lot of things.

Regarding to TableSourceFactory#Context#getExecutionOptions, do we really
need this interface?
Should the connector factory be aware of the properties is merged with
hints or not?
What's the problem if we always get properties from
`CatalogTable#getProperties`?

Best,
Jark

On Tue, 7 Apr 2020 at 10:39, Kurt Young  wrote:

> Sounds like a reasonable compromise, disabling this feature by default is a
> way to protect
> the vulnerability, and we can simplify the design quite a lot. We can
> gather some users'
> feedback to see whether further protections are necessary in the future.
>
> Best,
> Kurt
>
>
> On Mon, Apr 6, 2020 at 11:49 PM Timo Walther  wrote:
>
> > I agree with Aljoscha. The length of this thread shows that this is
> > highly controversal. I think nobody really likes this feature 100% but
> > we could not find a better solution. I would consider it as a
> > nice-to-have improvement during a notebook/debugging session.
> >
> > I would accept avoiding whitelisting/blacklisting if the feature is
> > disabled by default. And we make the merged properties available in a
> > separate TableSourceFactory#Context#getExecutionOptions as Danny
> proposed.
> >
> > What do you think?
> >
> > Thanks,
> > Timo
> >
> >
> > On 06.04.20 09:59, Aljoscha Krettek wrote:
> > > The reason I'm saying it should be disabled by default is that this
> uses
> > > hint syntax, and hints should really not change query semantics.
> > >
> > > I'm quite strongly against hints that change query semantics, but if we
> > > disable this by default I would be (reluctantly) OK with the feature.
> > > Companies that create deployments or set up the SQL environment for
> > > users can enable the feature if they want.
> > >
> > > But yes, I also agree that we don't need whitelisting/blacklisting,
> > > which makes this a lot easier to do.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 06.04.20 04:27, Danny Chan wrote:
> > >> Hi, everyone ~
> > >>
> > >> @Aljoscha @Timo
> > >>
> > >>> I think we're designing ourselves into ever more complicated corners
> > >> here
> > >>
> > >> I kindly agree that, personally didn't see strong reasons why we
> > >> should limit on each connector properties:
> > >>
> > >> • we can define any table options for CREATE TABLE, why we treat the
> > >> dynamic options differently, we never consider any security problems
> > >> when create table, we should not either for dynamic table options
> > >> • If we do not have whitelist properties or blacklist properties, the
> > >> table source creation work would be much easier, just used the merged
> > >> options. There is no need to modify each connector to decide which
> > >> options could be overridden and how we merge them(the merge work is
> > >> redundant).
> > >> • @Timo, how about we support another interface
> > >> `TableSourceFactory#Context.getExecutionOptions`, we always use this
> > >> interface to get the options to create our table source. There is no
> > >> need to copy the catalog table itselt, we just need to generate our
> > >> Context correctly.
> > >> • @Aljoscha I agree to have a global config option, but I disagree to
> > >> default disable it, a global default config would break the user
> > >> experience too much, especially when user want to modify the options
> > >> in a ad-hoc way.
> > >>
> > >>
> > >>
> > >> I suggest to remove `TableSourceFactory#supportedHintOptions` or
> > >> `TableSourceFactory#forbiddenHintOptions` based on the fact that we
> > >> does not have black/white list for CREATE TABLE at all at lease for
> > >> current codebase.
> > >>
> > >>
> > >> @Timo (i have replied offline but allows to represent it here again)
> > >>
> > >> The `TableSourceFactory#supportedHintOptions` doesn't work well for 3
> > >> reasons compared to `TableSourceFactory#forbiddenHintOptions`:
> > >> 1. For key with wildcard, like connector.property.* , use a blacklist
> > >> make us have the ability to disable some of the keys under that, i.e.
> > >> connector.property.key1 , a whitelist can only match with prefix
> > >>
> > >> 2. We want the connectors to have the ability to disable format type
> > >> switch format.type but allows all the other properties, e.g. format.*
> > >> without format.type(let's call it SET_B), if we use the whitelist, we
> > >> have to enumerate all the specific format keys start with format
> > >> (SET_B), but with the old connector factories, we have no idea what
> > >> specific format keys it supports(there is either a format.* or
> nothing).
> > >>
> > >> 3. Except the cases for 1 and 2, for normal keys(no wildcard), the
> > >> blacklist and whitelist has the same expressiveness, use blacklist
> > >> makes the code not too verbose to enumerate all the duplicate keys
> > >> with #supportedKeys .(Not very strong reason, but i think as a
> > >> connector developer, it makes sense)
> > >>
> > >> Best,
> > >> 

[VOTE] FLIP-122: New Connector Property Keys for New Factory

2020-04-02 Thread Jark Wu
Hi all,

I would like to start the vote for FLIP-122 [1], which is discussed and
reached a consensus in the discussion thread [2].

The vote will be open for at least 72h, unless there is an objection or not
enough votes.

Thanks,
Timo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-122-New-Connector-Property-Keys-for-New-Factory-td39462.html


Re: [VOTE] FLIP-95: New TableSource and TableSink interfaces

2020-04-02 Thread Jark Wu
Hi Timo,

I don't think source should work with `CatalogTableSchema`. So far, a table
source doesn't need to know the logic information of computed column and
watermark.
IMO, we should provide a method to convert from `CatalogTableSchema` into
`TableSchema` without computed columns in source factory,
and a source should just hold the `TableSchema`.

I agree doing the intersection/diff logic is trivial, but maybe we can
provide utilities to do that? So that we can keep the interface clean.


Best,
Jark


On Thu, 2 Apr 2020 at 20:17, Timo Walther  wrote:

> Hi Jark,
>
> if catalogs use `CatalogTableSchema` in the future. The source would
> internally also work with `CatalogTableSchema`. I'm fine with cleaning
> up the `TableSchema` class but should a source deal with two different
> schema classes then?
>
> Another problem that I see is that connectors usually need to perform
> some index arithmetics. Dealing with TableSchema and additionally within
> a field with DataType might be a bit inconvenient. A dedicated class
> with utilities might be helpful such that not every source needs to
> implement the same intersection/diff logic again.
>
> Regards,
> Timo
>
>
> On 02.04.20 14:06, Jark Wu wrote:
> > Hi Dawid,
> >
> >> How to express projections with TableSchema?
> > The TableSource holds the original TableSchema (i.e. from DDL) and the
> > pushed TableSchema represents the schema after projection.
> > Thus the table source can compare them to figure out changed field orders
> > or not matched types.
> > For most sources who maps physical storage by field names (e.g. jdbc,
> > hbase, json) they can just simply apply the pushed TableSchema.
> > But sources who maps by field indexes (e.g. csv), they need to figure out
> > the projected indexes by comparing the original and projected schema.
> > For example, the original schema is [a: String, b: Int, c: Timestamp],
> and
> > b is pruned, then the pushed schema is [a: String, c: Timestamp]. So the
> > source can figure out index=1 is pruned.
> >
> >> How do we express projection of a nested field with TableSchema?
> > This is the same to the above one. For example, the original schema is
> [rk:
> > String, f1 Row].
> > If `f1.q1` is pruned, the pushed schema will be [rk: String, f1 Row > Double>].
> >
> >> TableSchema might be used at too many different places for different
> > responsibilities.
> > Agree. We have recognized that a structure and builder for pure table
> > schema is required in many places. But we mixed many concepts of catalog
> > table schema in TableSchema.
> > IIRC, in an offline discussion of FLIP-84, we want to introduce a new
> > `CatalogTableSchema` to represent the schema part of a DDL,
> > and remove all the watermark, computed column information from
> TableSchema?
> > Then `TableSchema` can continue to serve as a pure table schema and it
> > stays in a good package.
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Thu, 2 Apr 2020 at 19:39, Timo Walther  wrote:
> >
> >> Hi Dawid,
> >>
> >> thanks for your feedback. I agree with your concerns. I also observed
> >> that TableSchema might be used at too many different places for
> >> different responsibilities.
> >>
> >> How about we introduce a helper class for `SupportsProjectionPushDown`
> >> and also `LookupTableSource#Context#getKeys()` to represent nested
> >> structure of names. Data types, constraints, or computed columns are not
> >> necessary at those locations.
> >>
> >> We can also add utility methods for connectors to this helper class
> >> there to quickly figuring out differences between the original table
> >> schema and the new one.
> >>
> >> SelectedFields {
> >>
> >>  private LogicalType orignalRowType; // set by the planner
> >>
> >>  private int[][] indices;
> >>
> >>  getNames(int... at): String[]
> >>
> >>  getNames(String... at): String[]
> >>
> >>  getIndices(int... at): int[]
> >>
> >>  getNames(String... at): String[]
> >>
> >>  toTableSchema(): TableSchema
> >> }
> >>
> >> What do others think?
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >>
> >> On 02.04.20 12:28, Dawid Wysakowicz wrote:
> >>> Generally +1
> >>>
> >>> One slight concern I have is about the |SupportsProjectionPushDown.|I
> >>> don't nec

Re: [VOTE] FLIP-95: New TableSource and TableSink interfaces

2020-04-02 Thread Jark Wu
Hi Dawid,

> How to express projections with TableSchema?
The TableSource holds the original TableSchema (i.e. from DDL) and the
pushed TableSchema represents the schema after projection.
Thus the table source can compare them to figure out changed field orders
or not matched types.
For most sources who maps physical storage by field names (e.g. jdbc,
hbase, json) they can just simply apply the pushed TableSchema.
But sources who maps by field indexes (e.g. csv), they need to figure out
the projected indexes by comparing the original and projected schema.
For example, the original schema is [a: String, b: Int, c: Timestamp], and
b is pruned, then the pushed schema is [a: String, c: Timestamp]. So the
source can figure out index=1 is pruned.

> How do we express projection of a nested field with TableSchema?
This is the same to the above one. For example, the original schema is [rk:
String, f1 Row].
If `f1.q1` is pruned, the pushed schema will be [rk: String, f1 Row].

> TableSchema might be used at too many different places for different
responsibilities.
Agree. We have recognized that a structure and builder for pure table
schema is required in many places. But we mixed many concepts of catalog
table schema in TableSchema.
IIRC, in an offline discussion of FLIP-84, we want to introduce a new
`CatalogTableSchema` to represent the schema part of a DDL,
and remove all the watermark, computed column information from TableSchema?
Then `TableSchema` can continue to serve as a pure table schema and it
stays in a good package.

Best,
Jark




On Thu, 2 Apr 2020 at 19:39, Timo Walther  wrote:

> Hi Dawid,
>
> thanks for your feedback. I agree with your concerns. I also observed
> that TableSchema might be used at too many different places for
> different responsibilities.
>
> How about we introduce a helper class for `SupportsProjectionPushDown`
> and also `LookupTableSource#Context#getKeys()` to represent nested
> structure of names. Data types, constraints, or computed columns are not
> necessary at those locations.
>
> We can also add utility methods for connectors to this helper class
> there to quickly figuring out differences between the original table
> schema and the new one.
>
> SelectedFields {
>
> private LogicalType orignalRowType; // set by the planner
>
> private int[][] indices;
>
> getNames(int... at): String[]
>
> getNames(String... at): String[]
>
> getIndices(int... at): int[]
>
> getNames(String... at): String[]
>
> toTableSchema(): TableSchema
> }
>
> What do others think?
>
> Thanks,
> Timo
>
>
>
> On 02.04.20 12:28, Dawid Wysakowicz wrote:
> > Generally +1
> >
> > One slight concern I have is about the |SupportsProjectionPushDown.|I
> > don't necessarily understand how can we express projections with
> > TableSchema. It's unclear for me what happens when a type of a field
> > changes, fields are in a different order, when types do not match. How
> > do we express projection of a nested field with TableSchema?
> >
> > I don't think this changes the core design presented in the FLIP,
> > therefore I'm fine with accepting the FLIP. I wanted to mention my
> > concerns, so that maybe we can adjust the passed around structures
> slightly.
> >
> > Best,
> >
> > Dawid
> > ||
> >
> > On 30/03/2020 14:42, Leonard Xu wrote:
> >> +1(non-binding)
> >>
> >> Best,
> >> Leonard Xu
> >>
> >>> 在 2020年3月30日,16:43,Jingsong Li  写道:
> >>>
> >>> +1
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Mon, Mar 30, 2020 at 4:41 PM Kurt Young  wrote:
> >>>
> >>>> +1
> >>>>
> >>>> Best,
> >>>> Kurt
> >>>>
> >>>>
> >>>> On Mon, Mar 30, 2020 at 4:08 PM Benchao Li
> wrote:
> >>>>
> >>>>> +1 (non-binding)
> >>>>>
> >>>>> Jark Wu  于2020年3月30日周一 下午3:57写道:
> >>>>>
> >>>>>> +1 from my side.
> >>>>>>
> >>>>>> Thanks Timo for driving this.
> >>>>>>
> >>>>>> Best,
> >>>>>> Jark
> >>>>>>
> >>>>>> On Mon, 30 Mar 2020 at 15:36, Timo Walther
> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I would like to start the vote for FLIP-95 [1], which is discussed
> >>>> and
> >>>>>>> reached a consensus in the discussion thread [2].
> >>>>>>>
> >>>>>>> The vote will be open until April 2nd (72h), unless there is an
> >>>>>>> objection or not enough votes.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>
> https://lists.apache.org/thread.html/r03cbce8996fd06c9b0406c9ddc0d271bd456f943f313b9261fa061f9%40%3Cdev.flink.apache.org%3E
> >>>>> --
> >>>>>
> >>>>> Benchao Li
> >>>>> School of Electronics Engineering and Computer Science, Peking
> University
> >>>>> Tel:+86-15650713730
> >>>>> Email:libenc...@gmail.com;libenc...@pku.edu.cn
> >>>>>
> >>> --
> >>> Best, Jingsong Lee
>
>


Re: [ANNOUNCE] New Committers and PMC member

2020-04-01 Thread Jark Wu
Congratulations to you all!

Best,
Jark

On Wed, 1 Apr 2020 at 20:33, Kurt Young  wrote:

> Congratulations to you all!
>
> Best,
> Kurt
>
>
> On Wed, Apr 1, 2020 at 7:41 PM Danny Chan  wrote:
>
> > Congratulations!
> >
> > Best,
> > Danny Chan
> > 在 2020年4月1日 +0800 PM7:36,dev@flink.apache.org,写道:
> > >
> > > Congratulations!
> >
>


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-04-01 Thread Jark Wu
Hi everyone,

If there are no objections, I would like to start a voting thread by
tomorrow. So this is the last call to give feedback for FLIP-122.

Cheers,
Jark

On Wed, 1 Apr 2020 at 16:30, zoudan  wrote:

> Hi Jark,
> Thanks for the proposal.
> I like the idea that we put the version in ‘connector’ field. That will be
> friendly for existing jobs as some of existing connectors may do not
> contains  ‘connector.version’.
>
> Best,
> Dan Zou
>
>
>


Re: [DISCUSS] Change default planner to blink planner in 1.11

2020-03-31 Thread Jark Wu
+1 to make blink planner as default planner.

We should give blink planner more exposure to encourage users trying out
new features and lead users to migrate to blink planner.

Glad to see blink planner is used in production since 1.9! @Benchao

Best,
Jark

On Wed, 1 Apr 2020 at 11:31, Benchao Li  wrote:

> Hi Kurt,
>
> It's excited to hear that the community aims to make Blink Planner default
> in 1.11.
>
> We have been using blink planner since 1.9 for streaming processing, it
> works very well,
> and covers many use cases in our company.
> So +1 to make it default in 1.11 from our side.
>
> Kurt Young  于2020年4月1日周三 上午9:15写道:
>
>> Hi Dev and User,
>>
>> Blink planner for Table API & SQL is introduced in Flink 1.9 and already
>> be the default planner for
>> SQL client in Flink 1.10. And since we already decided not introducing
>> any new features to the
>> original Flink planner, it already lacked of so many great features that
>> the community has been working on, such as brand new type system, more DDL
>> support and more planner capabilities.
>> During this time, we've also received lots of great feedback from users
>> who were trying to use blink
>> planner, both positive and negative (like bugs). This is a good sign, it
>> at least shows more and more
>> users are starting to try out.
>>
>> So I want to start this discussion more formally to talk about
>> replacing the default planner to blink.
>> Specifically, I want to gather feedbacks from dev and user about whether
>> blink planner already
>> cover the original planner's capabilities, what kind of issues you've ran
>> into when try out blink
>> planner and then make you fallback to original one. Since there is still
>> a month to go when feature
>> freeze, there's still enough time for community to further enhance blink
>> planner for this purpose.
>>
>> Let me know what you think, especially if you want to report or complain
>> about something. Thanks
>> in advance.
>>
>> Best,
>> Kurt
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


[jira] [Created] (FLINK-16889) Support converting BIGINT to TIMESTAMP for TO_TIMESTAMP function

2020-03-31 Thread Jark Wu (Jira)
Jark Wu created FLINK-16889:
---

 Summary: Support converting BIGINT to TIMESTAMP for TO_TIMESTAMP 
function
 Key: FLINK-16889
 URL: https://issues.apache.org/jira/browse/FLINK-16889
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Reporter: Jark Wu
 Fix For: 1.11.0


Many users reported that it is not possible for now to define an event-time 
field or watermark on a BIGINT column. 

Impala supports to convert BIGINT into TIMESTAMP value using TO_TIMESTAMP() 
function [1]. The BIGINT argument represents the number of seconds past the 
epoch. 

I think it makes sense to support this for Flink's TO_TIMESTAMP(). It is the 
converse of the UNIX_TIMESTAMP() function, which produces a BIGINT representing 
the number of seconds past the epoch.


[1]: 
https://impala.apache.org/docs/build/html/topics/impala_datetime_functions.html#datetime_functions__to_timestamp





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-31 Thread Jark Wu
Hi everyone,

In order to not postpone FLIP-95 further, I include the "removing
Factory#factoryVersion" in this FLIP.
I updated the "Proposed Changes" section to reflect the changes.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

Please let me know if you have other questions.

Best,
Jark


On Wed, 1 Apr 2020 at 00:56, Jark Wu  wrote:

> Hi, Dawid
>
> Regarding to `connector.property-version`,
> I totally agree with you we should implicitly add a "property-version=1"
> (without 'connector.' prefix) property for future evolving. So I updated
> FLIP for this.
> However, I still doubt to use property version to distinguish old/new
> factory. Because it will break existing DDLs, unless users manually set
> `connector.property-version=1` to their existing DDLs. So I still prefer
> to use `connector` vs `connector.type` to distinguish old/new factory.
>
> 
>
> Hi Timo,
>
> +1 to zookeeper.znode-parent
>
> > sink.bulk-flush -> sink.buffer-flush?
> I would like to keep using bulk-flush, because "bulk" is a well-known
> Elasticsearch API and terminology [1].
> I think we don't need to align all the terminologies across Flink
> connectors. Following the external system's
> terminology will be more easy-to-understand for connector users.
>
> > username -> secrect.username?
> That's a good idea to hide secret values in logs. However, is there a
> better way to do that? For example, add a secretOptions() method to Factory?
> IMO, a `secrect.` prefix is too weak and limit the design of a property
> key. For example, we want to support authentication for elasticserch [2],
> a possible property keys will be `authentication.enabled=true`,
> `authentication.username=jark`, `authentication.password=123456`.
>
> [1]:
> https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
> [2]: https://issues.apache.org/jira/browse/FLINK-16788
> 
>
> Hi Zhenghua,
>
> > does this affect descriptors and related validators?
> No. As described in the compatiblity section, all the old properties will
> be routed to the old factories.
> So all the current descriptors (will be translated to old property keys)
> are still compatible.
> But, we should have a plan to translate current descritors into new
> property keys.
> However, that is not in the scope of this FLIP and could be done in a
> separate simple JIRA issue.
>
> Best,
> Jark
>
> On Tue, 31 Mar 2020 at 16:08, Zhenghua Gao  wrote:
>
>> Hi Jark,
>>
>> Thanks for the proposal. I'm +1 since it's more simple and clear for sql
>> users.
>> I have a question about this: does this affect descriptors and related
>> validators?
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Mon, Mar 30, 2020 at 2:02 PM Jark Wu  wrote:
>>
>> > Hi everyone,
>> >
>> > I want to start a discussion about further improve and simplify our
>> current
>> > connector porperty keys, aka WITH options. Currently, we have a
>> > 'connector.' prefix for many properties, but they are verbose, and we
>> see a
>> > big inconsistency between the properties when designing FLIP-107.
>> >
>> > So we propose to remove all the 'connector.' prefix and rename
>> > 'connector.type' to 'connector', 'format.type' to 'format'. So a new
>> Kafka
>> > DDL may look like this:
>> >
>> > CREATE TABLE kafka_table (
>> >  ...
>> > ) WITH (
>> >  'connector' = 'kafka',
>> >  'version' = '0.10',
>> >  'topic' = 'test-topic',
>> >  'startup-mode' = 'earliest-offset',
>> >  'properties.bootstrap.servers' = 'localhost:9092',
>> >  'properties.group.id' = 'testGroup',
>> >  'format' = 'json',
>> >  'format.fail-on-missing-field' = 'false'
>> > );
>> >
>> > The new connector property key set will come together with new Factory
>> > inferface which is proposed in FLIP-95. Old properties are still
>> compatible
>> > with their existing implementation. New properties are only available in
>> > new DynamicTableFactory implementations.
>> >
>> > You can access the detailed FLIP here:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>> >
>> > Best,
>> > Jark
>> >
>>
>


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-31 Thread Jark Wu
Hi, Dawid

Regarding to `connector.property-version`,
I totally agree with you we should implicitly add a "property-version=1"
(without 'connector.' prefix) property for future evolving. So I updated
FLIP for this.
However, I still doubt to use property version to distinguish old/new
factory. Because it will break existing DDLs, unless users manually set
`connector.property-version=1` to their existing DDLs. So I still prefer to
use `connector` vs `connector.type` to distinguish old/new factory.



Hi Timo,

+1 to zookeeper.znode-parent

> sink.bulk-flush -> sink.buffer-flush?
I would like to keep using bulk-flush, because "bulk" is a well-known
Elasticsearch API and terminology [1].
I think we don't need to align all the terminologies across Flink
connectors. Following the external system's
terminology will be more easy-to-understand for connector users.

> username -> secrect.username?
That's a good idea to hide secret values in logs. However, is there a
better way to do that? For example, add a secretOptions() method to Factory?
IMO, a `secrect.` prefix is too weak and limit the design of a property
key. For example, we want to support authentication for elasticserch [2],
a possible property keys will be `authentication.enabled=true`,
`authentication.username=jark`, `authentication.password=123456`.

[1]:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
[2]: https://issues.apache.org/jira/browse/FLINK-16788


Hi Zhenghua,

> does this affect descriptors and related validators?
No. As described in the compatiblity section, all the old properties will
be routed to the old factories.
So all the current descriptors (will be translated to old property keys)
are still compatible.
But, we should have a plan to translate current descritors into new
property keys.
However, that is not in the scope of this FLIP and could be done in a
separate simple JIRA issue.

Best,
Jark

On Tue, 31 Mar 2020 at 16:08, Zhenghua Gao  wrote:

> Hi Jark,
>
> Thanks for the proposal. I'm +1 since it's more simple and clear for sql
> users.
> I have a question about this: does this affect descriptors and related
> validators?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Mar 30, 2020 at 2:02 PM Jark Wu  wrote:
>
> > Hi everyone,
> >
> > I want to start a discussion about further improve and simplify our
> current
> > connector porperty keys, aka WITH options. Currently, we have a
> > 'connector.' prefix for many properties, but they are verbose, and we
> see a
> > big inconsistency between the properties when designing FLIP-107.
> >
> > So we propose to remove all the 'connector.' prefix and rename
> > 'connector.type' to 'connector', 'format.type' to 'format'. So a new
> Kafka
> > DDL may look like this:
> >
> > CREATE TABLE kafka_table (
> >  ...
> > ) WITH (
> >  'connector' = 'kafka',
> >  'version' = '0.10',
> >  'topic' = 'test-topic',
> >  'startup-mode' = 'earliest-offset',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'properties.group.id' = 'testGroup',
> >  'format' = 'json',
> >  'format.fail-on-missing-field' = 'false'
> > );
> >
> > The new connector property key set will come together with new Factory
> > inferface which is proposed in FLIP-95. Old properties are still
> compatible
> > with their existing implementation. New properties are only available in
> > new DynamicTableFactory implementations.
> >
> > You can access the detailed FLIP here:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> >
> > Best,
> > Jark
> >
>


Re: [VOTE] FLIP-110: Support LIKE clause in CREATE TABLE

2020-03-31 Thread Jark Wu
+1 from my side. This will be a very useful feature. 

Best,
Jark

> 2020年3月31日 18:15,Danny Chan  写道:
> 
> +1 for this feature, although the WITH syntax breaks the SQL standard, but 
> it’s compatible with our CREATE TABLE syntax, seems well from my side.
> 
> Best,
> Danny Chan
> 在 2020年3月31日 +0800 PM5:46,Dawid Wysakowicz ,写道:
>> Hi,
>> 
>> Just wanted to notify the voters that after a comment from Jingsong I
>> introduced a new like-option in the FLIP. Because it happened very short
>> after the vote started I will not cancel the vote (only Timo voted
>> before the changed).
>> 
>> Feel free to change your votes if you disagree. Sorry for the inconvenience.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 31/03/2020 09:43, Timo Walther wrote:
>>> +1 this will reduce manual schema work a lot!
>>> 
>>> Thanks,
>>> Timo
>>> 
>>> On 31.03.20 09:33, Dawid Wysakowicz wrote:
 Hi all,
 
 I would like to start the vote for FLIP-110 [1], which is discussed and
 reached a consensus in the discussion thread [2].
 
 The vote will be open until April 3rd (72h), unless there is an
 objection or not enough votes.
 
 Best,
 
 Dawid
 
 [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A+Support+LIKE+clause+in+CREATE+TABLE
 
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-110-Support-LIKE-clause-in-CREATE-TABLE-td38378.html/
 /
 
>>> 
>> 



[jira] [Created] (FLINK-16887) Refactor retraction rules to support inferring ChangelogMode

2020-03-31 Thread Jark Wu (Jira)
Jark Wu created FLINK-16887:
---

 Summary: Refactor retraction rules to support inferring 
ChangelogMode
 Key: FLINK-16887
 URL: https://issues.apache.org/jira/browse/FLINK-16887
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.11.0


Current retraction machanism only support 2 message kinds (+ and -). However, 
since FLIP-95, we will introduce more message kinds to users 
(insert/delete/update_before/update_after). 

In order to support that, we should first refactor current retraction rules to 
support ChangelogMode inference. In previous, every node will be attached with 
a AccMode trait after retraction rule. In the proposed design, we will infer 
ChangelogMode trait for every node. 

A detailed design documentation will be attached soon. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-31 Thread Jark Wu
Hi everyone,

Thanks for the great feedbacks so far.

I updated the FLIP documentation according to the discussion. Changes
include:
- remove "version" key, and merge it into "connector"
- add "scan", "lookup", "sink" prefix to some property keys if they are
only used in that case.
- add a "New Property Key" section to list all the previous property keys
and new property keys.

We use "scan" and "lookup" instead of "source" prefix because we should
distinguish them and they aligns to FLIP-95 ScanTableSource and
LookupTableSource.
I also colored red for some major change of property keys in the FLIP. I
will list some of them here too:

kafka:
connector.startup-mode => scan.startup.mode
connector.specific-offsets => scan.startup.specific-offsets
connector.startup-timestamp-millis => scan.startup.timestamp-millis
connector.sink-partitioner & connector.sink-partitioner-class =>
sink.partitioner

elasticsearch:
connector.key-delimiter => document-id.key-delimiter  # make it
explicit that it is used for document id
connector.key-null-literal => document-id.key-null-literal  # and
it also can be used for es sources in the future
connector.bulk-flush.back-off.type => sink.bulk-flush.back-off.strategy

jdbc:
connector.table => table-name

Welcome further feedbacks!

Best,
Jark


On Tue, 31 Mar 2020 at 14:45, Jark Wu  wrote:

> Hi Kurt,
>
> I also prefer "-" as version delimiter now. I didn't remove the "_"
> proposal by mistake, that's why I sent another email last night :)
> Regarding to "property-version", I also think we shouldn't let users to
> learn about this. And ConfigOption provides a good ability
> to support deprecated keys and auto-generate documentation for deprecated
> keys.
>
> Hi Danny,
>
> Regarding to “connector.properties.*”:
> In FLIP-95, the Factory#requiredOptions() and Factory#optionalOptions()
> inferfaces are only used for generation of documentation.
> It does not influence the discovery and validation of a factory. The
> validation logic is defined by connectors
> in createDynamicTableSource/Sink().
> So you don't have to provide an option for "connector.properties.*". But I
> think we should make ConfigOption support wildcard in the long term for a
> full story.
>
> I don't think we should inline all the "connector.properties.*",
> otherwise, it will be very tricky for users to configure the properties.
> Regarding to FLIP-113, I suggest to provide some ConfigOptions for
> commonly used kafka properties and put them in the supportedHintOptions(),
> e.g. "connector.properties.group.id",
> "connector.properties.fetch.min.bytes".
>
> Best,
> Jark
>
>
>
>
>
> On Tue, 31 Mar 2020 at 12:04, Danny Chan  wrote:
>
>> Thanks Jark for bring up this discussion, +1 for this idea, I believe the
>> user has suffered from the verbose property key for long time.
>>
>> Just one question, how do we handle the keys with wildcard, such as the
>> “connector.properties.*” in Kafka connector which would then hand-over to
>> Kafka client directly. As what suggested in FLIP-95, we use a ConfigOption
>> to describe the “supported properties”, then I have to concerns:
>>
>> • For the new keys, do we still need to put multi-lines there the such
>> key, such as “connector.properties.abc” “connector.properties.def”, or
>> should we inline them, such as “some-key-prefix” = “k1=v1, k2=v2 ..."
>> • Should the ConfigOption support the wildcard ? (If we plan to support
>> the current multi-line style)
>>
>>
>> Best,
>> Danny Chan
>> 在 2020年3月31日 +0800 AM12:37,Jark Wu ,写道:
>> > Hi all,
>> >
>> > Thanks for the feedbacks.
>> >
>> > It seems that we have a conclusion to put the version into the factory
>> > identifier. I'm also fine with this.
>> > If we have this outcome, the interface of Factory#factoryVersion is not
>> > needed anymore, this can simplify the learning cost of new factory.
>> > We may need to update FLIP-95 and re-vote for it? cc @Timo Walther
>> > 
>> >
>> > kafka => kafka for 0.11+ versions, we don't suffix "-universal", because
>> > the meaning of "universal" not easy to understand.
>> > kafka-0.11 => kafka for 0.11 version
>> > kafka-0.10 => kafka for 0.10 version
>> > elasticsearch-6 => elasticsearch for 6.x versions
>> > elasticsearch-7 => elasticsearch for 7.x versions
>> > hbase-1.4 => hbase for 1.4.x versions
>> > jdbc
>> > filesystem
>> >
>&g

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-31 Thread Jark Wu
Hi Kurt,

I also prefer "-" as version delimiter now. I didn't remove the "_"
proposal by mistake, that's why I sent another email last night :)
Regarding to "property-version", I also think we shouldn't let users to
learn about this. And ConfigOption provides a good ability
to support deprecated keys and auto-generate documentation for deprecated
keys.

Hi Danny,

Regarding to “connector.properties.*”:
In FLIP-95, the Factory#requiredOptions() and Factory#optionalOptions()
inferfaces are only used for generation of documentation.
It does not influence the discovery and validation of a factory. The
validation logic is defined by connectors
in createDynamicTableSource/Sink().
So you don't have to provide an option for "connector.properties.*". But I
think we should make ConfigOption support wildcard in the long term for a
full story.

I don't think we should inline all the "connector.properties.*", otherwise,
it will be very tricky for users to configure the properties.
Regarding to FLIP-113, I suggest to provide some ConfigOptions for commonly
used kafka properties and put them in the supportedHintOptions(),
e.g. "connector.properties.group.id",
"connector.properties.fetch.min.bytes".

Best,
Jark





On Tue, 31 Mar 2020 at 12:04, Danny Chan  wrote:

> Thanks Jark for bring up this discussion, +1 for this idea, I believe the
> user has suffered from the verbose property key for long time.
>
> Just one question, how do we handle the keys with wildcard, such as the
> “connector.properties.*” in Kafka connector which would then hand-over to
> Kafka client directly. As what suggested in FLIP-95, we use a ConfigOption
> to describe the “supported properties”, then I have to concerns:
>
> • For the new keys, do we still need to put multi-lines there the such
> key, such as “connector.properties.abc” “connector.properties.def”, or
> should we inline them, such as “some-key-prefix” = “k1=v1, k2=v2 ..."
> • Should the ConfigOption support the wildcard ? (If we plan to support
> the current multi-line style)
>
>
> Best,
> Danny Chan
> 在 2020年3月31日 +0800 AM12:37,Jark Wu ,写道:
> > Hi all,
> >
> > Thanks for the feedbacks.
> >
> > It seems that we have a conclusion to put the version into the factory
> > identifier. I'm also fine with this.
> > If we have this outcome, the interface of Factory#factoryVersion is not
> > needed anymore, this can simplify the learning cost of new factory.
> > We may need to update FLIP-95 and re-vote for it? cc @Timo Walther
> > 
> >
> > kafka => kafka for 0.11+ versions, we don't suffix "-universal", because
> > the meaning of "universal" not easy to understand.
> > kafka-0.11 => kafka for 0.11 version
> > kafka-0.10 => kafka for 0.10 version
> > elasticsearch-6 => elasticsearch for 6.x versions
> > elasticsearch-7 => elasticsearch for 7.x versions
> > hbase-1.4 => hbase for 1.4.x versions
> > jdbc
> > filesystem
> >
> > We use "-" as the version delimiter to make them to be more consistent.
> > This is not forces, users can also use other delimiters or without
> > delimiter.
> > But this can be a guilde in the Javadoc of Factory, to make the connector
> > ecosystem to be more consistent.
> >
> > What do you think?
> >
> > 
> >
> > Regarding "connector.property-version":
> >
> > Hi @Dawid Wysakowicz  , the new fatories are
> > designed not support to read current properties.
> > All the current properties are routed to the old factories if they are
> > using "connector.type". Otherwise, properties are routed to new
> factories.
> >
> > If I understand correctly, the "connector.property-version" is attched
> > implicitly by system, not manually set by users.
> > For example, the framework should add "connector.property-version=1" to
> > properties when processing DDL statement.
> > I'm fine to add a "connector.property-version=1" when processing DDL
> > statement, but I think it's also fine if we don't,
> > because this can be done in the future if need and the default version
> can
> > be 1.
> >
> > Best,
> > Jark
> >
> > On Tue, 31 Mar 2020 at 00:36, Jark Wu  wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the feedbacks.
> > >
> > > It seems that we have a conclusion to put the version into the factory
> > > identifier. I'm also fine with this.
> > > If we have this outcome, the interface of Factory#factoryVersion is not
> > > needed anymore, this can 

Re: [DISCUSS] FLIP-84 Feedback Summary

2020-03-30 Thread Jark Wu
ATALOG 'mycat';
> >> INSERT INTO t1 SELECT * FROM s;
> >> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>
> >> For executeMultilineSql():
> >>
> >> sync because regular SQL
> >> sync because regular Batch SQL
> >> async because Streaming SQL
> >>
> >> For executeAsyncMultilineSql():
> >>
> >> async because everything should be async
> >> async because everything should be async
> >> async because everything should be async
> >>
> >> What we should not start for executeAsyncMultilineSql():
> >>
> >> sync because DDL
> >> async because everything should be async
> >> async because everything should be async
> >>
> >> What are you thoughts here?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 26.03.20 12:50, godfrey he wrote:
> >>> Hi Timo,
> >>>
> >>> I agree with you that streaming queries mostly need async execution.
> >>> In fact, our original plan is only introducing sync methods in this
> FLIP,
> >>> and async methods (like "executeSqlAsync") will be introduced in the
> >> future
> >>> which is mentioned in the appendix.
> >>>
> >>> Maybe the async methods also need to be considered in this FLIP.
> >>>
> >>> I think sync methods is also useful for streaming which can be used to
> >> run
> >>> bounded source.
> >>> Maybe we should check whether all sources are bounded in sync execution
> >>> mode.
> >>>
> >>>> Also, if we block for streaming queries, we could never support
> >>>> multiline files. Because the first INSERT INTO would block the further
> >>>> execution.
> >>> agree with you, we need async method to submit multiline files,
> >>> and files should be limited that the DQL and DML should be always in
> the
> >>> end for streaming.
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>> Timo Walther  于2020年3月26日周四 下午4:29写道:
> >>>
> >>>> Hi Godfrey,
> >>>>
> >>>> having control over the job after submission is a requirement that was
> >>>> requested frequently (some examples are [1], [2]). Users would like to
> >>>> get insights about the running or completed job. Including the jobId,
> >>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>
> >>>> It is good to have a discussion about synchronous/asynchronous
> >>>> submission now to have a complete execution picture.
> >>>>
> >>>> I thought we submit streaming queries mostly async and just wait for
> the
> >>>> successful submission. If we block for streaming queries, how can we
> >>>> collect() or print() results?
> >>>>
> >>>> Also, if we block for streaming queries, we could never support
> >>>> multiline files. Because the first INSERT INTO would block the further
> >>>> execution.
> >>>>
> >>>> If we decide to block entirely on streaming queries, we need the async
> >>>> execution methods in the design already. However, I would rather go
> for
> >>>> non-blocking streaming queries. Also with the `EMIT STREAM` key word
> in
> >>>> mind that we might add to SQL statements soon.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>
> >>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> Thanks for the updating.
> >>>>>
> >>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>> `TableEnvironment.executeSql()` only supports single line statement,
> >> and
> >>>> we
> >>>>> can support multiline statement later (needs more discussion about
> >> this).
> >>>>>
> >>>>> Regarding to "StatementSet.explian()", I don't have strong opinions
> >> about
> >>>>> that.
> >>>>>
> >>>>> Regarding to "TableResult.getJobClient()", I think it's unnecessary.
> >> The
&g

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi all,

Thanks for the feedbacks.

It seems that we have a conclusion to put the version into the factory
identifier. I'm also fine with this.
If we have this outcome, the interface of Factory#factoryVersion is not
needed anymore, this can simplify the learning cost of new factory.
We may need to update FLIP-95 and re-vote for it? cc @Timo Walther


kafka  => kafka for 0.11+ versions, we don't suffix "-universal", because
the meaning of "universal" not easy to understand.
kafka-0.11 => kafka for 0.11 version
kafka-0.10 => kafka for 0.10 version
elasticsearch-6 => elasticsearch for 6.x versions
elasticsearch-7 => elasticsearch for 7.x versions
hbase-1.4 => hbase for 1.4.x versions
jdbc
filesystem

We use "-" as the version delimiter to make them to be more consistent.
This is not forces, users can also use other delimiters or without
delimiter.
But this can be a guilde in the Javadoc of Factory, to make the connector
ecosystem to be more consistent.

What do you think?



Regarding "connector.property-version":

Hi @Dawid Wysakowicz  , the new fatories are
designed not support to read current properties.
All the current properties are routed to the old factories if they are
using "connector.type". Otherwise, properties are routed to new factories.

If I understand correctly, the "connector.property-version" is attched
implicitly by system, not manually set by users.
For example, the framework should add "connector.property-version=1" to
properties when processing DDL statement.
I'm fine to add a "connector.property-version=1" when processing DDL
statement, but I think it's also fine if we don't,
because this can be done in the future if need and the default version can
be 1.

Best,
Jark

On Tue, 31 Mar 2020 at 00:36, Jark Wu  wrote:

> Hi all,
>
> Thanks for the feedbacks.
>
> It seems that we have a conclusion to put the version into the factory
> identifier. I'm also fine with this.
> If we have this outcome, the interface of Factory#factoryVersion is not
> needed anymore, this can simplify the learning cost of new factory.
> We may need to update FLIP-95 and re-vote for it? cc @Timo Walther
> 
>
> Btw, I would like to use "_" instead of "-" as the version delimiter,
> because "-" looks like minus and may confuse users, e.g. "elasticsearch-6".
> This is not forced, but should be a guilde in the Javadoc of Factory.
> I propose to use the following identifiers for existing connectors,
>
> kafka  => kafka for 0.11+ versions, we don't suffix "-universal", because
> the meaning of "universal" not easy to understand.
> kafka-0.11 => kafka for 0.11 version
> kafka-0.10 => kafka for 0.10 version
> elasticsearch-6 => elasticsearch for 6.x versions
> elasticsearch-7 => elasticsearch for 7.x versions
> hbase-1.4 => hbase for 1.4.x versions
> jdbc
> filesystem
>
> We use "-" as the version delimiter to make them to be more consistent.
> This is not forces, users can also use other delimiters or without
> delimiter.
> But this can be a guilde in the Javadoc of Factory, to make the connector
> ecosystem to be more consistent.
>
> What do you think?
>
> 
>
> Regarding "connector.property-version":
>
> Hi @Dawid Wysakowicz  , the new fatories are
> designed not support to read current properties.
> All the current properties are routed to the old factories if they are
> using "connector.type". Otherwise, properties are routed to new factories.
>
> If I understand correctly, the "connector.property-version" is attched
> implicitly by system, not manually set by users.
> For example, the framework should add "connector.property-version=1" to
> properties when processing DDL statement.
> I'm fine to add a "connector.property-version=1" when processing DDL
> statement, but I think it's also fine if we don't,
> because this can be done in the future if need and the default version can
> be 1.
>
> Best,
> Jark
>
>
>
>
>
> On Mon, 30 Mar 2020 at 23:21, Dawid Wysakowicz 
> wrote:
>
>> Hi all,
>>
>> I like the overall design of the FLIP.
>>
>> As for the withstanding concerns. I kind of like the approach to put the
>> version into the factory identifier. I think it's the cleanest way to
>> say that this version actually applies to the connector itself and not
>> to the system it connects to. BTW, I think the outcome of this
>> discussion will affect interfaces described in FLIP-95. If we put the
>> version into the functionIdentifier, do we need Factory#factoryVersion?
>>
>> I also think it does 

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
 will
> >> be easier. Especially we have many connectors,
> >> and some of the need version property required, and some of them not.
> >>
> >> Regarding Jingsong's suggestion,
> >> IMO, it's a very good complement to the FLIP. Distinguishing
> >> properties for source and sink can be very useful, and
> >> also this will make the connector property more precise.
> >> We are also sick of this for now, we cannot know whether a DDL is a
> >> source or sink unless we look through all queries where
> >> the table is used.
> >> Even more, some of the required properties are only required for
> >> source, bug we cannot leave it blank for sink, and vice versa.
> >> I think we can also add a type for dimension tables except source and
> >> sink.
> >>
> >> Kurt Young mailto:ykt...@gmail.com>> 于2020年3月30日
> >> 周一 下午8:16写道:
> >>
> >>  > It's not possible for the framework to throw such exception.
> >> Because the
> >> framework doesn't know what versions do the connector support.
> >>
> >> Not really, we can list all valid connectors framework could
> >> found. E.g.
> >> user mistyped 'kafka-0.x', the error message will looks like:
> >>
> >> we don't have any connector named "kafka-0.x", but we have:
> >> FileSystem
> >> Kafka-0.10
> >> Kafka-0.11
> >> ElasticSearch6
> >> ElasticSearch7
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Mon, Mar 30, 2020 at 5:11 PM Jark Wu  >> <mailto:imj...@gmail.com>> wrote:
> >>
> >>  > Hi Kurt,
> >>  >
> >>  > > 2) Lists all available connectors seems also quite
> >> straightforward, e.g
> >>  > user provided a wrong "kafka-0.8", we tell user we have
> >> candidates of
> >>  > "kakfa-0.11", "kafka-universal"
> >>  > It's not possible for the framework to throw such exception.
> >> Because the
> >>  > framework doesn't know what versions do the connector support.
> >> All the
> >>  > version information is a blackbox in the identifier. But with
> >>  > `Factory#factoryVersion()` interface, we can know all the
> >> supported
> >>  > versions.
> >>  >
> >>  > > 3) I don't think so. We can still treat it as the same
> >> connector but with
> >>  > different versions.
> >>  > That's true but that's weird. Because from the plain DDL
> >> definition, they
> >>  > look like different connectors with different "connector"
> >> value, e.g.
> >>  > 'connector=kafka-0.8', 'connector=kafka-0.10'.
> >>  >
> >>  > > If users don't set any version, we will use "kafka-universal"
> >> instead.
> >>  > The behavior is inconsistent IMO.
> >>  > That is a long term vision when there is no kafka clusters
> >> with <0.11
> >>  > version.
> >>  > At that point, "universal" is the only supported version in Flink
> >> and the
> >>  > "version" key can be optional.
> >>  >
> >>  > -
> >>  >
> >>  > Hi Jingsong,
> >>  >
> >>  > > "version" vs "kafka.version"
> >>  > I though about it. But if we prefix "kafka" to version, we should
> >> prefix
> >>  > "kafka" for all other property keys, because they are all kafka
> >> specific
> >>  > options.
> >>  > However, that will make the property set verbose, see rejected
> >> option#2 in
> >>  > the FLIP.
> >>  >
> >>  > > explicitly separate options for source and sink
> >>  > That's a good topic. It's good to have a guideline for the new
> >> property
> >>  > keys.
> >>  > I'm fine to prefix with a "source"/"sink" for some connector
> >> keys.
> >>  > Actually, we already do this in some connectors, e.g. jdbc and
> >> hbase.
> >>  >
> >>  > Best,
> >>  > Jark
> >>  >

Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-30 Thread Jark Wu
Hi Kurt,

> 2) Lists all available connectors seems also quite straightforward, e.g
user provided a wrong "kafka-0.8", we tell user we have candidates of
"kakfa-0.11", "kafka-universal"
It's not possible for the framework to throw such exception. Because the
framework doesn't know what versions do the connector support. All the
version information is a blackbox in the identifier. But with
`Factory#factoryVersion()` interface, we can know all the supported
versions.

> 3) I don't think so. We can still treat it as the same connector but with
different versions.
That's true but that's weird. Because from the plain DDL definition, they
look like different connectors with different "connector" value, e.g.
'connector=kafka-0.8', 'connector=kafka-0.10'.

> If users don't set any version, we will use "kafka-universal" instead.
The behavior is inconsistent IMO.
That is a long term vision when there is no kafka clusters with <0.11
version.
At that point, "universal" is the only supported version in Flink and the
"version" key can be optional.

-

Hi Jingsong,

> "version" vs "kafka.version"
I though about it. But if we prefix "kafka" to version, we should prefix
"kafka" for all other property keys, because they are all kafka specific
options.
However, that will make the property set verbose, see rejected option#2 in
the FLIP.

> explicitly separate options for source and sink
That's a good topic. It's good to have a guideline for the new property
keys.
I'm fine to prefix with a "source"/"sink" for some connector keys.
Actually, we already do this in some connectors, e.g. jdbc and hbase.

Best,
Jark

On Mon, 30 Mar 2020 at 16:36, Jingsong Li  wrote:

> Thanks Jark for the proposal.
>
> +1 to the general idea.
>
> For "version", what about "kafka.version"? It is obvious to know its
> meaning.
>
> And I'd like to start a new topic:
> Should we need to explicitly separate source from sink?
> With the development of batch and streaming, more and more connectors have
> both source and sink.
>
> So should we set a rule for table properties:
> - properties for both source and sink: without prefix, like "topic"
> - properties for source only: with "source." prefix, like
> "source.startup-mode"
> - properties for sink only: with "sink." prefix, like "sink.partitioner"
>
> What do you think?
>
> Best,
> Jingsong Lee
>
> On Mon, Mar 30, 2020 at 3:56 PM Jark Wu  wrote:
>
> > Hi Kurt,
> >
> > That's good questions.
> >
> > > the meaning of "version"
> > There are two versions in the old design. One is property version
> > "connector.property-version" which can be used for backward
> compatibility.
> > The other one is "connector.version" which defines the version of
> external
> > system, e.g. 0.11" for kafka, "6" or "7" for ES.
> > In this proposal, the "version" is the previous "connector.version". The
> > ""connector.property-version" is not introduced in new design.
> >
> > > how to keep the old capability which can evolve connector properties
> > The "connector.property-version" is an optional key in the old design and
> > is never bumped up.
> > I'm not sure how "connector.property-version" should work in the initial
> > design. Maybe @Timo Walther  has more knowledge on
> > this.
> > But for the new properties, every options should be expressed as
> > `ConfigOption` which provides `withDeprecatedKeys(...)` method to easily
> > support evolving keys.
> >
> > > a single keys instead of two, e.g. "kafka-0.11", "kafka-universal"?
> > There are several benefit to use separate "version" key I can see:
> > 1) it's more readable to separete them into different keys, because they
> > are orthogonal concepts.
> > 2) the planner can give all the availble versions in the exception
> message,
> > if user uses a wrong version (this is often reported in user ML).
> > 3) If we use "kafka-0.11" as connector identifier, we may have to write a
> > full documentation for each version, because they are different
> > "connector"?
> > IMO, for 0.11, 0.11, etc... kafka, they are actually the same
> connector
> > but with different "client jar" version,
> > they share all the same supported property keys and should reside
> > together.
> > 4) IMO, the future vision is version-free. At some point in the fut

Re: [VOTE] FLIP-95: New TableSource and TableSink interfaces

2020-03-30 Thread Jark Wu
+1 from my side.

Thanks Timo for driving this.

Best,
Jark

On Mon, 30 Mar 2020 at 15:36, Timo Walther  wrote:

> Hi all,
>
> I would like to start the vote for FLIP-95 [1], which is discussed and
> reached a consensus in the discussion thread [2].
>
> The vote will be open until April 2nd (72h), unless there is an
> objection or not enough votes.
>
> Thanks,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> [2]
>
> https://lists.apache.org/thread.html/r03cbce8996fd06c9b0406c9ddc0d271bd456f943f313b9261fa061f9%40%3Cdev.flink.apache.org%3E
>


<    4   5   6   7   8   9   10   11   12   13   >