Question about SQL gateway

2022-10-10 Thread Ww J
Hi,

I submit a stream job from the SQL gateway. The stream job keeps outputting 
results to the SQL gateway. If the SQL gateway restarts or crashes, the stream 
job will continue running. After the SQL gateway restarts, how to get the 
results of the steam job?

Thanks.

Jack

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Qingsheng Ren
Thanks for the details Chesnay!

By “alias” I mean to respect the original definition made in FLIP-33 for 
numRecordsOut, which is the number of records written to the external system, 
and keep numRecordsSend as the same value as numRecordsOut for compatibility.

I think keeping numRecordsOut for the output to the external system is more 
intuitive to end users because in most cases the metric of data flow output is 
more essential. I agree with you that a new metric is required, but considering 
compatibility and users’ intuition I prefer to keep the initial definition of 
numRecordsOut in FLIP-33 and name a new metric for sink writer’s output to 
downstream operators. This might be against consistency with metrics in other 
operators in Flink but maybe it’s acceptable to have the sink as a special case.

Best,
Qingsheng
On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , wrote:
> > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
>
> But that's not possible. If it were that simple there would have never been a 
> need to introduce another metric in the first place.
>
> It's a rather fundamental issue with how the new sinks work, in that they 
> emit data to the external system (usually considered as "numRecordsOut" of 
> sinks) while _also_ sending data to a downstream operator (usually considered 
> as "numRecordsOut" of tasks).
> The original issue was that the numRecordsOut of the sink counted both (which 
> is completely wrong).
>
> A new metric was always required; otherwise you inevitably end up breaking 
> some semantic.
> Adding a new metric for what the sink writes to the external system is, for 
> better or worse, more consistent with how these metrics usually work in Flink.
>
> On 10/10/2022 12:45, Qingsheng Ren wrote:
> > Thanks everyone for joining the discussion!
> >
> > > Do you have any idea what has happened in the process here?
> >
> > The discussion in this PR [1] shows some details and could be helpful to 
> > understand the original motivation of the renaming. We do have a test case 
> > for guarding metrics but unfortunaly the case was also modified so the 
> > defense was broken.
> >
> > I think the reason why both the developer and the reviewer forgot to 
> > trigger an discussion and gave a green pass on the change is that metrics 
> > are quite “trivial” to be noticed as public APIs. As mentioned by Martijn I 
> > couldn’t find a place noting that metrics are public APIs and should be 
> > treated carefully while contributing and reviewing.
> >
> > IMHO three actions could be made to prevent this kind of changes in the 
> > future:
> >
> > a. Add test case for metrics (which we already have in SinkMetricsITCase)
> > b. We emphasize that any public-interface breaking changes should be 
> > proposed by a FLIP or discussed in mailing list, and should be listed in 
> > the release note.
> > c. We remind contributors and reviewers about what should be considered as 
> > public API, and include metric names in it.
> >
> > For b and c these two pages [2][3] might be proper places.
> >
> > About the patch to revert this, it looks like we have a consensus on 1.16. 
> > As of 1.15 I think it’s worthy to trigger a minor version. I didn’t see 
> > complaints about this for now so it should be OK to save the situation 
> > asap. I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut 
> > considering there could possibly some users have already adapted their 
> > system to the new naming, and have another internal metric for reflecting 
> > number of outgoing committable batches (actually the numRecordsIn of sink 
> > committer operator should be carrying this info already).
> >
> > [1] https://github.com/apache/flink/pull/18825
> > [2] https://flink.apache.org/contributing/contribute-code.html
> > [3] https://flink.apache.org/contributing/reviewing-prs.html
> >
> > Best,
> > Qingsheng
> > On Oct 10, 2022, 17:40 +0800, Xintong Song , wrote:
> > > +1 for reverting these changes in Flink 1.16.
> > >
> > > For 1.15.3, can we make these metrics available via both names (numXXXOut 
> > > and numXXXSend)? In this way we don't break it for those who already 
> > > migrated to 1.15 and numXXXSend. That means we still need to change 
> > > SinkWriterOperator to use another metric name in 1.15.3, which IIUC is 
> > > internal to Flink sink.
> > >
> > > I'm overall +1 to change numXXXOut back to its original semantics. AFAIK 
> > > (from meetup / flink-forward questionaires), most users do not migrate to 
> > > a new Flink release immediately, until the next 1-2 major releases are 
> > > out.
> > >
> > > Best,
> > > Xintong
> > >
> > >
> > > > On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
> > > >  wrote:
> > > > > Hi Qingsheng,
> > > > >
> > > > > Do you have any idea what has happened in the process here? Do we 
> > > > > know why
> > > > > they were changed? I was under the impression that these metric names 
> > > > > were
> > > > > newly introduced due to the new in

Data type mapping

2022-10-10 Thread Pouria Pirzadeh
I am writing Flink applications in Java and I need to do data type
conversions between SQL/Table `DataType` and `TypeInformation`. According
to Flink's documentation, type mapping methods in  TypeConversions

class are all deprecated; However it is not clear how a developer should
perform such a mapping correctly, given that there is no straightforward
data type mapping between DataType/Logical types (instances of DataTypes
class
)
and `TypeInformation` (instances of Types class
).
Any suggestions/clarifications?


Preserve rowtime through join

2022-10-10 Thread Matthias Broecheler
Hey Flinksters,

I was wondering if you had any ideas for how to preserve the rowtime across
an INNER equi join so that the output can be used in a temporal join.

I've attached an example based on the TemporalJoinTest where I'm creating
two views by deduplicating underlying streams (to rates_pk and agency_pk),
then join those views on agencyid (the pk). I dedup the output to make the
primary key of rates_pk the pk of the output, but when I run
testDoubleInlineJoinState, I get the exception:

java.lang.AssertionError: Sql optimization: Assertion error: type mismatch:
ref:
TIMESTAMP(3) *ROWTIME*
input:
TIMESTAMP(3)

which indicates that the rowtime is not preserved. Intuitively, it seems
that the bigger of the two input rowtimes should be the resulting rowtime,
but I tried using GREATEST and that didn't work either.

Note, that my example works if I do two temporal joins (testDoubleJoin) or
join the underlying streams before deduplicating
(testDoubleInlineJoinStream). So, I feel like I'm not too far off. It's
just that the way I'm generating the Flink plan happens to be creating the
joined state table first and I'd love to find a way to tell Flink how to
preserve the rowtime.

Is there a way to do that?
Thanks a lot,
Matthias

--- CODE -

package org.apache.flink.table.planner.plan.stream.sql.join

import org.apache.flink.table.api.{ExplainDetail, ValidationException}
import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase}
import org.junit.Assert.{assertTrue, fail}
import org.junit.{Before, Test}

/**
 * Test temporal join in stream mode.
 */
class TemporalJoinTestExperimentShare extends TableTestBase {

  val util: StreamTableTestUtil = streamTestUtil()

  @Before
  def before(): Unit = {
util.addTable(
  """
|CREATE TABLE Orders (
| amount INT,
| currency STRING,
| rowtime TIMESTAMP(3),
| proctime AS PROCTIME(),
| WATERMARK FOR rowtime AS rowtime
|) WITH (
| 'connector' = 'values'
|)
  """.stripMargin)


util.addTable(
  """
|CREATE TABLE RatesHistory (
| currency STRING,
| rate INT,
| agencyid INT,
| rowtime TIMESTAMP(3),
| WATERMARK FOR rowtime AS rowtime
|) WITH (
| 'connector' = 'values'
|)
  """.stripMargin)

util.addTable(
  """
|CREATE TABLE AgencyHistory (
| agencyid INT,
| name STRING,
| rowtime TIMESTAMP(3),
| WATERMARK FOR rowtime AS rowtime
|) WITH (
| 'connector' = 'values'
|)
  """.stripMargin)

util.addTable(
  " CREATE VIEW rates_pk AS SELECT currency, rate, agencyid,
rowtime FROM " +
"  (SELECT *, " +
"  ROW_NUMBER() OVER (PARTITION BY currency ORDER BY
rowtime DESC) AS rowNum " +
"   FROM RatesHistory" +
"  ) T " +
"  WHERE rowNum = 1")

util.addTable(
  " CREATE VIEW agency_pk AS SELECT agencyid, name, rowtime FROM " +
"  (SELECT *, " +
"  ROW_NUMBER() OVER (PARTITION BY agencyid ORDER BY
rowtime DESC) AS rowNum " +
"   FROM AgencyHistory" +
"  ) T " +
"  WHERE rowNum = 1")

util.addTable(
  " CREATE VIEW rates_and_agency_pk_stream AS SELECT currency,
rate, rowtime, agencyname FROM " +
"  (SELECT *, " +
"  ROW_NUMBER() OVER (PARTITION BY currency ORDER BY
rowtime DESC) AS rowNum " +
"   FROM (SELECT r.currency, r.rate,  r.rowtime, a.name AS
agencyname FROM RatesHistory AS r JOIN AgencyHistory AS a ON
r.agencyid = a.agencyid AND r.rowtime >= a.rowtime AND r.rowtime <=
a.rowtime + INTERVAL '999' YEAR(3)) X " +
"  ) T " +
"  WHERE rowNum = 1")

util.addTable(
  " CREATE VIEW rates_and_agency_pk_state AS SELECT currency,
rate, rowtime, agencyname FROM " +
"  (SELECT *, " +
"  ROW_NUMBER() OVER (PARTITION BY currency ORDER BY
rowtime DESC) AS rowNum " +
"   FROM (SELECT r.currency, r.rate, r.rowtime as rowtime,
a.name AS agencyname FROM rates_pk AS r JOIN agency_pk AS a ON
r.agencyid = a.agencyid) X " +
"  ) T " +
"  WHERE rowNum = 1")

  }

  @Test
  def testDoubleInlineJoinStream(): Unit = {
val sqlQuery = "SELECT * " +
  "FROM Orders AS o JOIN " +
  "rates_and_agency_pk_stream " +
  "FOR SYSTEM_TIME AS OF o.rowtime AS r " +
  "ON o.currency = r.currency "

println(util.tableEnv.sqlQuery(sqlQuery).explain())
  }

  @Test
  def testDoubleInlineJoinState(): Unit = {
val sqlQuery = "SELECT * " +
  "FROM Orders AS o JOIN " +
  "rates_and_agency_pk_state " +
  "FOR SYSTEM_TIME AS OF o.rowtime AS r " +
  "ON o.currency = r.currency "

println(util.tableEnv.sqlQuery(sqlQuery).explain())
  }

  @Test
  def testDoubleJoin(): Unit = {
val sqlQuery = "SELECT * " +
  "FROM Orders AS o JOIN " +
  "rates_pk " +
  "FOR SYSTEM_TIME AS 

Re: Deserialize avro message without reader schema.

2022-10-10 Thread Yaroslav Tkachenko
Hi Sucheth,

The short answer is no, when deserializing Avro messages you have to
provide the schema somehow, either directly or using the Schema Registry.

On Mon, Oct 10, 2022 at 10:00 AM Sucheth S  wrote:

> Hi,
>
> I'm trying to deserialize avro messages from the kafka topic as a
> consumer.
>
> As a kafka consumer, I do not want to hold the reader schema on my end.
> Is there a way to deserialize avro messages to GenericRecord
> without providing the reader schema ?
>
>
> Regards,
> Sucheth Shivakumar
> website : https://sucheths.com
> mobile : +1(650)-576-8050
> San Mateo, United States
>


Re: Deserialize avro message without reader schema.

2022-10-10 Thread Антон
Hi,What you mean by holding the schema? You don't have to hold it as schema registry does it for you.20:00, 10 октября 2022 г., Sucheth S :Hi,I'm trying to deserialize avro messages from the kafka topic as a consumer. As a kafka consumer, I do not want to hold the reader schema on my end.Is there a way to deserialize avro messages to GenericRecord without providing the reader schema ?Regards,Sucheth Shivakumarwebsite : https://sucheths.commobile : +1(650)-576-8050San Mateo, United States
Sent from Yandex.Mail for mobile: http://m.ya.ru/ymail

Deserialize avro message without reader schema.

2022-10-10 Thread Sucheth S
Hi,

I'm trying to deserialize avro messages from the kafka topic as a consumer.

As a kafka consumer, I do not want to hold the reader schema on my end.
Is there a way to deserialize avro messages to GenericRecord
without providing the reader schema ?


Regards,
Sucheth Shivakumar
website : https://sucheths.com
mobile : +1(650)-576-8050
San Mateo, United States


Re: Window state size with global window and custom trigger

2022-10-10 Thread Alexis Sarda-Espinosa
Thanks for the confirmation :)

Regards,
Alexis.

On Sun, 9 Oct 2022, 10:37 Hangxiang Yu,  wrote:

> Hi, Alexis.
> I think you are right. It also applies for a global window with a custom
> trigger.
> If you apply a ReduceFunction or AggregateFunction, the window state size
> usually is smaller than applying ProcessWindowFunction due to the
> aggregated value. It also works for global windows.
> Of course, the state size of a global window also depends on how you
> implement your trigger.
> BTW, we often use TTL to reduce the state size of the global window.
> Hope these can help you.
>
>
> On Sat, Oct 8, 2022 at 4:49 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I found an SO thread that clarifies some details of window state size
>> [1]. I would just like to confirm that this also applies when using a
>> global window with a custom trigger.
>>
>> The reason I ask is that the TriggerResult API is meant to cover all
>> supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
>> for a ProcessWindowFunction that holds all input records until it fires.
>> However, I assume there would be no distinction if I use a
>> (Rich)AggregateFunction, regardless of window type (global vs timed), but
>> I'd like to be sure.
>>
>> Regards,
>> Alexis.
>>
>> [1]
>> https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management
>>
>>
>
> --
> Best,
> Hangxiang.
>


Re: videos Flink Forward San Francisco 2022

2022-10-10 Thread Martin
+1

Am 10. Oktober 2022 14:39:28 MESZ schrieb "guenterh.lists" 
:
>really very sad - as far as I know this happens for the first time, attitude 
>of new Ververica?
>
>Hopefully immerok may resume the open mentality of data artisans.
>
>Günter
>
>On 10.10.22 11:26, Martijn Visser wrote:
>> Hi Günter,
>> 
>> I've understood that only the keynotes were recorded and not the other 
>> sessions.
>> 
>> Best regards,
>> 
>> Martijn
>> 
>> On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists  
>> wrote:
>> 
>> Sorry if this question was already posted
>> 
>> By now only a few videos of the conference were published (mainly the
>> keynotes)
>> https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB
>> 
>> Are the other presentations not going to be published?
>> 
>> Günter
>> 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Xingbo Huang
+1 for reverting these changes in Flink 1.16, so I will cancel 1.16.0-rc1.
+1 for `numXXXSend` as the alias of `numXXXOut` in 1.15.3.

Best,
Xingbo

Chesnay Schepler  于2022年10月10日周一 19:13写道:

> > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
>
> But that's not possible. If it were that simple there would have never
> been a need to introduce another metric in the first place.
>
> It's a rather fundamental issue with how the new sinks work, in that they
> emit data to the external system (usually considered as "numRecordsOut" of
> sinks) while _also_ sending data to a downstream operator (usually
> considered as "numRecordsOut" of tasks).
> The original issue was that the numRecordsOut of the sink counted both
> (which is completely wrong).
>
> A new metric was always required; otherwise you inevitably end up breaking
> *some* semantic.
> Adding a new metric for what the sink writes to the external system is,
> for better or worse, more consistent with how these metrics usually work in
> Flink.
>
> On 10/10/2022 12:45, Qingsheng Ren wrote:
>
> Thanks everyone for joining the discussion!
>
> > Do you have any idea what has happened in the process here?
>
> The discussion in this PR [1] shows some details and could be helpful to
> understand the original motivation of the renaming. We do have a test case
> for guarding metrics but unfortunaly the case was also modified so the
> defense was broken.
>
> I think the reason why both the developer and the reviewer forgot to
> trigger an discussion and gave a green pass on the change is that metrics
> are quite “trivial” to be noticed as public APIs. As mentioned by Martijn I
> couldn’t find a place noting that metrics are public APIs and should be
> treated carefully while contributing and reviewing.
>
> IMHO three actions could be made to prevent this kind of changes in the
> future:
>
> a. Add test case for metrics (which we already have in SinkMetricsITCase)
> b. We emphasize that any public-interface breaking changes should be
> proposed by a FLIP or discussed in mailing list, and should be listed in
> the release note.
> c. We remind contributors and reviewers about what should be considered as
> public API, and include metric names in it.
>
> For b and c these two pages [2][3] might be proper places.
>
> About the patch to revert this, it looks like we have a consensus on 1.16.
> As of 1.15 I think it’s worthy to trigger a minor version. I didn’t see
> complaints about this for now so it should be OK to save the situation
> asap. I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
> considering there could possibly some users have already adapted their
> system to the new naming, and have another internal metric for reflecting
> number of outgoing committable batches (actually the numRecordsIn of sink
> committer operator should be carrying this info already).
>
> [1] https://github.com/apache/flink/pull/18825
> [2] https://flink.apache.org/contributing/contribute-code.html
> [3] https://flink.apache.org/contributing/reviewing-prs.html
>
> Best,
> Qingsheng
> On Oct 10, 2022, 17:40 +0800, Xintong Song 
> , wrote:
>
> +1 for reverting these changes in Flink 1.16.
>
> For 1.15.3, can we make these metrics available via both names (numXXXOut
> and numXXXSend)? In this way we don't break it for those who already
> migrated to 1.15 and numXXXSend. That means we still need to change
> SinkWriterOperator to use another metric name in 1.15.3, which IIUC is
> internal to Flink sink.
>
> I'm overall +1 to change numXXXOut back to its original semantics. AFAIK
> (from meetup / flink-forward questionaires), most users do not migrate to a
> new Flink release immediately, until the next 1-2 major releases are out.
>
> Best,
>
> Xintong
>
>
> On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
> wrote:
>
>> Hi Qingsheng,
>>
>> Do you have any idea what has happened in the process here? Do we know why
>> they were changed? I was under the impression that these metric names were
>> newly introduced due to the new interfaces and because it still depends on
>> each connector implementing these.
>>
>> Sidenote: metric names are not mentioned in the FLIP process as a public
>> API. Might make sense to have a separate follow-up to add that to the list
>> (I do think we should list them there).
>>
>> +1 for reverting this and make this change in Flink 1.16
>>
>> I'm not in favour of releasing a Flink 1.15.3 with this change: I think
>> the
>> impact is too big for a patch version, especially given how long Flink
>> 1.15
>> is already out there.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu  wrote:
>>
>> > Thanks Qingsheng for starting this thread.
>> >
>> > +1 on reverting sink metric name and releasing 1.15.3 to fix this
>> > inconsistent behavior.
>> >
>> >
>> > Best,
>> > Leonard
>> >
>> >
>> >
>> >
>> >
>> > 2022年10月10日 下午3:06,Jark Wu  写道:
>> >
>> > Thanks for discovering this problem, Qingsheng!
>> >
>>

Re: videos Flink Forward San Francisco 2022

2022-10-10 Thread guenterh.lists
really very sad - as far as I know this happens for the first time, 
attitude of new Ververica?


Hopefully immerok may resume the open mentality of data artisans.

Günter

On 10.10.22 11:26, Martijn Visser wrote:

Hi Günter,

I've understood that only the keynotes were recorded and not the other 
sessions.


Best regards,

Martijn

On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists 
 wrote:


Sorry if this question was already posted

By now only a few videos of the conference were published (mainly the
keynotes)
https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB

Are the other presentations not going to be published?

Günter


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Chesnay Schepler

> I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut

But that's not possible. If it were that simple there would have never 
been a need to introduce another metric in the first place.


It's a rather fundamental issue with how the new sinks work, in that 
they emit data to the external system (usually considered as 
"numRecordsOut" of sinks) while _also_ sending data to a downstream 
operator (usually considered as "numRecordsOut" of tasks).
The original issue was that the numRecordsOut of the sink counted both 
(which is completely wrong).


A new metric was always required; otherwise you inevitably end up 
breaking /some/ semantic.
Adding a new metric for what the sink writes to the external system is, 
for better or worse, more consistent with how these metrics usually work 
in Flink.


On 10/10/2022 12:45, Qingsheng Ren wrote:

Thanks everyone for joining the discussion!

> Do you have any idea what has happened in the process here?

The discussion in this PR [1] shows some details and could be helpful 
to understand the original motivation of the renaming. We do have a 
test case for guarding metrics but unfortunaly the case was also 
modified so the defense was broken.


I think the reason why both the developer and the reviewer forgot to 
trigger an discussion and gave a green pass on the change is that 
metrics are quite “trivial” to be noticed as public APIs. As mentioned 
by Martijn I couldn’t find a place noting that metrics are public APIs 
and should be treated carefully while contributing and reviewing.


IMHO three actions could be made to prevent this kind of changes in 
the future:


a. Add test case for metrics (which we already have in SinkMetricsITCase)
b. We emphasize that any public-interface breaking changes should be 
proposed by a FLIP or discussed in mailing list, and should be listed 
in the release note.
c. We remind contributors and reviewers about what should be 
considered as public API, and include metric names in it.


For b and c these two pages [2][3] might be proper places.

About the patch to revert this, it looks like we have a consensus on 
1.16. As of 1.15 I think it’s worthy to trigger a minor version. I 
didn’t see complaints about this for now so it should be OK to save 
the situation asap. I’m with Xintong’s idea to treat numXXXSend as an 
alias of numXXXOut considering there could possibly some users have 
already adapted their system to the new naming, and have another 
internal metric for reflecting number of outgoing committable batches 
(actually the numRecordsIn of sink committer operator should be 
carrying this info already).


[1] https://github.com/apache/flink/pull/18825
[2] https://flink.apache.org/contributing/contribute-code.html
[3] https://flink.apache.org/contributing/reviewing-prs.html

Best,
Qingsheng
On Oct 10, 2022, 17:40 +0800, Xintong Song , wrote:

+1 for reverting these changes in Flink 1.16.

For 1.15.3, can we make these metrics available via both names 
(numXXXOut and numXXXSend)? In this way we don't break it for those 
who already migrated to 1.15 and numXXXSend. That means we still need 
to change SinkWriterOperator to use another metric name in 1.15.3, 
which IIUC is internal to Flink sink.


I'm overall +1 to change numXXXOut back to its original semantics. 
AFAIK (from meetup / flink-forward questionaires), most users do not 
migrate to a new Flink release immediately, until the next 1-2 major 
releases are out.


Best,

Xintong



On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
 wrote:


Hi Qingsheng,

Do you have any idea what has happened in the process here? Do we
know why
they were changed? I was under the impression that these metric
names were
newly introduced due to the new interfaces and because it still
depends on
each connector implementing these.

Sidenote: metric names are not mentioned in the FLIP process as a
public
API. Might make sense to have a separate follow-up to add that to
the list
(I do think we should list them there).

+1 for reverting this and make this change in Flink 1.16

I'm not in favour of releasing a Flink 1.15.3 with this change: I
think the
impact is too big for a patch version, especially given how long
Flink 1.15
is already out there.

Best regards,

Martijn

On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu 
wrote:

> Thanks Qingsheng for starting this thread.
>
> +1 on reverting sink metric name and releasing 1.15.3 to fix this
> inconsistent behavior.
>
>
> Best,
> Leonard
>
>
>
>
>
> 2022年10月10日 下午3:06,Jark Wu  写道:
>
> Thanks for discovering this problem, Qingsheng!
>
> I'm also +1 for reverting the breaking changes.
>
> IIUC, currently, the behavior of "numXXXOut" metrics of the new
and old
> sink is inconsistent.
> We have to break one of them to have consistent behavior. Sink
V2 is an
 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Chesnay Schepler

On 10/10/2022 11:24, Martijn Visser wrote:
Sidenote: metric names are not mentioned in the FLIP process as a 
public API. Might make sense to have a separate follow-up to add that 
to the list (I do think we should list them there).


That's a general issue we have. There's a lot of things we _ usually_ 
treat as a public API without having written it down; including


 * config options (I mean _keys_, not ConfigOption members)
 * CLI
 * REST API
 * metric names
 * scripts in distribution bin/ directory


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Qingsheng Ren
Thanks everyone for joining the discussion!

> Do you have any idea what has happened in the process here?

The discussion in this PR [1] shows some details and could be helpful to 
understand the original motivation of the renaming. We do have a test case for 
guarding metrics but unfortunaly the case was also modified so the defense was 
broken.

I think the reason why both the developer and the reviewer forgot to trigger an 
discussion and gave a green pass on the change is that metrics are quite 
“trivial” to be noticed as public APIs. As mentioned by Martijn I couldn’t find 
a place noting that metrics are public APIs and should be treated carefully 
while contributing and reviewing.

IMHO three actions could be made to prevent this kind of changes in the future:

a. Add test case for metrics (which we already have in SinkMetricsITCase)
b. We emphasize that any public-interface breaking changes should be proposed 
by a FLIP or discussed in mailing list, and should be listed in the release 
note.
c. We remind contributors and reviewers about what should be considered as 
public API, and include metric names in it.

For b and c these two pages [2][3] might be proper places.

About the patch to revert this, it looks like we have a consensus on 1.16. As 
of 1.15 I think it’s worthy to trigger a minor version. I didn’t see complaints 
about this for now so it should be OK to save the situation asap. I’m with 
Xintong’s idea to treat numXXXSend as an alias of numXXXOut considering there 
could possibly some users have already adapted their system to the new naming, 
and have another internal metric for reflecting number of outgoing committable 
batches (actually the numRecordsIn of sink committer operator should be 
carrying this info already).

[1] https://github.com/apache/flink/pull/18825
[2] https://flink.apache.org/contributing/contribute-code.html
[3] https://flink.apache.org/contributing/reviewing-prs.html

Best,
Qingsheng
On Oct 10, 2022, 17:40 +0800, Xintong Song , wrote:
> +1 for reverting these changes in Flink 1.16.
>
> For 1.15.3, can we make these metrics available via both names (numXXXOut and 
> numXXXSend)? In this way we don't break it for those who already migrated to 
> 1.15 and numXXXSend. That means we still need to change SinkWriterOperator to 
> use another metric name in 1.15.3, which IIUC is internal to Flink sink.
>
> I'm overall +1 to change numXXXOut back to its original semantics. AFAIK 
> (from meetup / flink-forward questionaires), most users do not migrate to a 
> new Flink release immediately, until the next 1-2 major releases are out.
>
> Best,
> Xintong
>
>
> > On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser  
> > wrote:
> > > Hi Qingsheng,
> > >
> > > Do you have any idea what has happened in the process here? Do we know why
> > > they were changed? I was under the impression that these metric names were
> > > newly introduced due to the new interfaces and because it still depends on
> > > each connector implementing these.
> > >
> > > Sidenote: metric names are not mentioned in the FLIP process as a public
> > > API. Might make sense to have a separate follow-up to add that to the list
> > > (I do think we should list them there).
> > >
> > > +1 for reverting this and make this change in Flink 1.16
> > >
> > > I'm not in favour of releasing a Flink 1.15.3 with this change: I think 
> > > the
> > > impact is too big for a patch version, especially given how long Flink 
> > > 1.15
> > > is already out there.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu  wrote:
> > >
> > > > Thanks Qingsheng for starting this thread.
> > > >
> > > > +1 on reverting sink metric name and releasing 1.15.3 to fix this
> > > > inconsistent behavior.
> > > >
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 2022年10月10日 下午3:06,Jark Wu  写道:
> > > >
> > > > Thanks for discovering this problem, Qingsheng!
> > > >
> > > > I'm also +1 for reverting the breaking changes.
> > > >
> > > > IIUC, currently, the behavior of "numXXXOut" metrics of the new and old
> > > > sink is inconsistent.
> > > > We have to break one of them to have consistent behavior. Sink V2 is an
> > > > evolving API which is just introduced in 1.15.
> > > > I think it makes sense to break the unstable API instead of the stable 
> > > > API
> > > > which many connectors and users depend on.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > >
> > > > On Mon, 10 Oct 2022 at 11:36, Jingsong Li  
> > > > wrote:
> > > >
> > > >> Thanks for driving, Qingsheng.
> > > >>
> > > >> +1 for reverting sink metric name.
> > > >>
> > > >> We often forget that metric is also one of the important APIs.
> > > >>
> > > >> +1 for releasing 1.15.3 to fix this.
> > > >>
> > > >> Best,
> > > >> Jingsong
> > > >>
> > > >> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  
> > > >> wrote:
> > > >> >
> > > >> > Thanks for raising the discussion, Qingsheng,
> > > >> >

Re: videos Flink Forward San Francisco 2022

2022-10-10 Thread Gyula Fóra
I think everyone would be happier with the videos published on Youtube but
it's unfortunately at the discretion of the organizer.

At this time they decided against it for some reason.

Gyula

On Mon, Oct 10, 2022 at 11:57 AM Martin  wrote:

> Hey,
>
> that's sad. Is it possible for future Flink Forwards to record again and
> publish all sessions?
>
> Best regards
> Martin
>
>
> Am 10. Oktober 2022 11:26:26 MESZ schrieb Martijn Visser <
> martijnvis...@apache.org>:
>>
>> Hi Günter,
>>
>> I've understood that only the keynotes were recorded and not the other
>> sessions.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists 
>> wrote:
>>
>>> Sorry if this question was already posted
>>>
>>> By now only a few videos of the conference were published (mainly the
>>> keynotes)
>>> https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB
>>>
>>> Are the other presentations not going to be published?
>>>
>>> Günter
>>>
>>>


Re: videos Flink Forward San Francisco 2022

2022-10-10 Thread Martin
Hey,

that's sad. Is it possible for future Flink Forwards to record again and 
publish all sessions?

Best regards
Martin


Am 10. Oktober 2022 11:26:26 MESZ schrieb Martijn Visser 
:
>Hi Günter,
>
>I've understood that only the keynotes were recorded and not the other
>sessions.
>
>Best regards,
>
>Martijn
>
>On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists 
>wrote:
>
>> Sorry if this question was already posted
>>
>> By now only a few videos of the conference were published (mainly the
>> keynotes)
>> https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB
>>
>> Are the other presentations not going to be published?
>>
>> Günter
>>
>>


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Xintong Song
+1 for reverting these changes in Flink 1.16.

For 1.15.3, can we make these metrics available via both names (numXXXOut
and numXXXSend)? In this way we don't break it for those who already
migrated to 1.15 and numXXXSend. That means we still need to change
SinkWriterOperator to use another metric name in 1.15.3, which IIUC is
internal to Flink sink.

I'm overall +1 to change numXXXOut back to its original semantics. AFAIK
(from meetup / flink-forward questionaires), most users do not migrate to a
new Flink release immediately, until the next 1-2 major releases are out.

Best,

Xintong



On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
wrote:

> Hi Qingsheng,
>
> Do you have any idea what has happened in the process here? Do we know why
> they were changed? I was under the impression that these metric names were
> newly introduced due to the new interfaces and because it still depends on
> each connector implementing these.
>
> Sidenote: metric names are not mentioned in the FLIP process as a public
> API. Might make sense to have a separate follow-up to add that to the list
> (I do think we should list them there).
>
> +1 for reverting this and make this change in Flink 1.16
>
> I'm not in favour of releasing a Flink 1.15.3 with this change: I think the
> impact is too big for a patch version, especially given how long Flink 1.15
> is already out there.
>
> Best regards,
>
> Martijn
>
> On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu  wrote:
>
> > Thanks Qingsheng for starting this thread.
> >
> > +1 on reverting sink metric name and releasing 1.15.3 to fix this
> > inconsistent behavior.
> >
> >
> > Best,
> > Leonard
> >
> >
> >
> >
> >
> > 2022年10月10日 下午3:06,Jark Wu  写道:
> >
> > Thanks for discovering this problem, Qingsheng!
> >
> > I'm also +1 for reverting the breaking changes.
> >
> > IIUC, currently, the behavior of "numXXXOut" metrics of the new and old
> > sink is inconsistent.
> > We have to break one of them to have consistent behavior. Sink V2 is an
> > evolving API which is just introduced in 1.15.
> > I think it makes sense to break the unstable API instead of the stable
> API
> > which many connectors and users depend on.
> >
> > Best,
> > Jark
> >
> >
> >
> > On Mon, 10 Oct 2022 at 11:36, Jingsong Li 
> wrote:
> >
> >> Thanks for driving, Qingsheng.
> >>
> >> +1 for reverting sink metric name.
> >>
> >> We often forget that metric is also one of the important APIs.
> >>
> >> +1 for releasing 1.15.3 to fix this.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin 
> wrote:
> >> >
> >> > Thanks for raising the discussion, Qingsheng,
> >> >
> >> > +1 on reverting the breaking changes.
> >> >
> >> > In addition, we might want to release a 1.15.3 to fix this and update
> >> the previous release docs with this known issue, so that users can
> upgrade
> >> to 1.15.3 when they hit it. It would also be good to add some backwards
> >> compatibility tests on metrics to avoid unintended breaking changes like
> >> this in the future.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren 
> wrote:
> >> >>
> >> >> Hi devs and users,
> >> >>
> >> >> I’d like to start a discussion about reverting a breaking change
> about
> >> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
> >> >>
> >> >> TL;DR
> >> >>
> >> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace
> >> by “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names
> >> are public APIs, this is a breaking change to end users and not backward
> >> compatible. Also unfortunately this breaking change was not discussed in
> >> the mailing list before.
> >> >>
> >> >> Background
> >> >>
> >> >> As defined previously in FLIP-33 (the FLIP page has been changed so
> >> please refer to the old version [3] ), metric “numRecordsOut” is used
> for
> >> reporting the total number of output records since the sink started
> (number
> >> of records written to the external system), and similarly for
> >> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and
> >> “numRecordsOutError”. Most sinks are following this naming and
> definition.
> >> However, these metrics are ambiguous in the new Sink API as “numXXXOut”
> >> could be used by the output of SinkWriterOperator for reporting number
> of
> >> Committables delivered to SinkCommitterOperator. In order to resolve the
> >> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics
> with
> >> “numXXXSend”.
> >> >>
> >> >> Necessity of reverting this change
> >> >>
> >> >> - Metric names are actually public API, as end users need to
> configure
> >> metric collecting and alerting system with metric names. Users have to
> >> reset all configurations related to affected metrics.
> >> >> - This could also affect custom and external sinks not maintained by
> >> Flink, which might have implemented with numXXXOut metrics.
> >> >> - The number of records sen

Re: videos Flink Forward San Francisco 2022

2022-10-10 Thread Martijn Visser
Hi Günter,

I've understood that only the keynotes were recorded and not the other
sessions.

Best regards,

Martijn

On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists 
wrote:

> Sorry if this question was already posted
>
> By now only a few videos of the conference were published (mainly the
> keynotes)
> https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB
>
> Are the other presentations not going to be published?
>
> Günter
>
>


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Martijn Visser
Hi Qingsheng,

Do you have any idea what has happened in the process here? Do we know why
they were changed? I was under the impression that these metric names were
newly introduced due to the new interfaces and because it still depends on
each connector implementing these.

Sidenote: metric names are not mentioned in the FLIP process as a public
API. Might make sense to have a separate follow-up to add that to the list
(I do think we should list them there).

+1 for reverting this and make this change in Flink 1.16

I'm not in favour of releasing a Flink 1.15.3 with this change: I think the
impact is too big for a patch version, especially given how long Flink 1.15
is already out there.

Best regards,

Martijn

On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu  wrote:

> Thanks Qingsheng for starting this thread.
>
> +1 on reverting sink metric name and releasing 1.15.3 to fix this
> inconsistent behavior.
>
>
> Best,
> Leonard
>
>
>
>
>
> 2022年10月10日 下午3:06,Jark Wu  写道:
>
> Thanks for discovering this problem, Qingsheng!
>
> I'm also +1 for reverting the breaking changes.
>
> IIUC, currently, the behavior of "numXXXOut" metrics of the new and old
> sink is inconsistent.
> We have to break one of them to have consistent behavior. Sink V2 is an
> evolving API which is just introduced in 1.15.
> I think it makes sense to break the unstable API instead of the stable API
> which many connectors and users depend on.
>
> Best,
> Jark
>
>
>
> On Mon, 10 Oct 2022 at 11:36, Jingsong Li  wrote:
>
>> Thanks for driving, Qingsheng.
>>
>> +1 for reverting sink metric name.
>>
>> We often forget that metric is also one of the important APIs.
>>
>> +1 for releasing 1.15.3 to fix this.
>>
>> Best,
>> Jingsong
>>
>> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  wrote:
>> >
>> > Thanks for raising the discussion, Qingsheng,
>> >
>> > +1 on reverting the breaking changes.
>> >
>> > In addition, we might want to release a 1.15.3 to fix this and update
>> the previous release docs with this known issue, so that users can upgrade
>> to 1.15.3 when they hit it. It would also be good to add some backwards
>> compatibility tests on metrics to avoid unintended breaking changes like
>> this in the future.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren  wrote:
>> >>
>> >> Hi devs and users,
>> >>
>> >> I’d like to start a discussion about reverting a breaking change about
>> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
>> >>
>> >> TL;DR
>> >>
>> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace
>> by “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names
>> are public APIs, this is a breaking change to end users and not backward
>> compatible. Also unfortunately this breaking change was not discussed in
>> the mailing list before.
>> >>
>> >> Background
>> >>
>> >> As defined previously in FLIP-33 (the FLIP page has been changed so
>> please refer to the old version [3] ), metric “numRecordsOut” is used for
>> reporting the total number of output records since the sink started (number
>> of records written to the external system), and similarly for
>> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and
>> “numRecordsOutError”. Most sinks are following this naming and definition.
>> However, these metrics are ambiguous in the new Sink API as “numXXXOut”
>> could be used by the output of SinkWriterOperator for reporting number of
>> Committables delivered to SinkCommitterOperator. In order to resolve the
>> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics with
>> “numXXXSend”.
>> >>
>> >> Necessity of reverting this change
>> >>
>> >> - Metric names are actually public API, as end users need to configure
>> metric collecting and alerting system with metric names. Users have to
>> reset all configurations related to affected metrics.
>> >> - This could also affect custom and external sinks not maintained by
>> Flink, which might have implemented with numXXXOut metrics.
>> >> - The number of records sent to external system is way more important
>> than the number of Committables sent to SinkCommitterOperator, as the
>> latter one is just an internal implementation of sink. We could have a new
>> metric name for the latter one instead.
>> >> - We could avoid splitting the project by version (like “plz use
>> numXXXOut before 1.15 and use numXXXSend after”) if we revert it ASAP,
>> cosidering 1.16 is still not released for now.
>> >>
>> >> As a consequence, I’d like to hear from devs and users about your
>> opinion on changing these metrics back to “numXXXOut”.
>> >>
>> >> Looking forward to your reply!
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-26126
>> >> [2] https://issues.apache.org/jira/browse/FLINK-26492
>> >> [1] FLIP-33, version 18:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136
>> >>
>> >> Best,
>> >> Qingsheng
>>
>
>


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Leonard Xu
Thanks Qingsheng for starting this thread.

+1 on reverting sink metric name and releasing 1.15.3 to fix this inconsistent 
behavior.


Best,
Leonard





> 2022年10月10日 下午3:06,Jark Wu  写道:
> 
> Thanks for discovering this problem, Qingsheng!
> 
> I'm also +1 for reverting the breaking changes. 
> 
> IIUC, currently, the behavior of "numXXXOut" metrics of the new and old sink 
> is inconsistent. 
> We have to break one of them to have consistent behavior. Sink V2 is an 
> evolving API which is just introduced in 1.15. 
> I think it makes sense to break the unstable API instead of the stable API 
> which many connectors and users depend on.
> 
> Best,
> Jark
> 
> 
> 
> On Mon, 10 Oct 2022 at 11:36, Jingsong Li  > wrote:
> Thanks for driving, Qingsheng.
> 
> +1 for reverting sink metric name.
> 
> We often forget that metric is also one of the important APIs.
> 
> +1 for releasing 1.15.3 to fix this.
> 
> Best,
> Jingsong
> 
> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  > wrote:
> >
> > Thanks for raising the discussion, Qingsheng,
> >
> > +1 on reverting the breaking changes.
> >
> > In addition, we might want to release a 1.15.3 to fix this and update the 
> > previous release docs with this known issue, so that users can upgrade to 
> > 1.15.3 when they hit it. It would also be good to add some backwards 
> > compatibility tests on metrics to avoid unintended breaking changes like 
> > this in the future.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren  > > wrote:
> >>
> >> Hi devs and users,
> >>
> >> I’d like to start a discussion about reverting a breaking change about 
> >> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
> >>
> >> TL;DR
> >>
> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace by 
> >> “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names are 
> >> public APIs, this is a breaking change to end users and not backward 
> >> compatible. Also unfortunately this breaking change was not discussed in 
> >> the mailing list before.
> >>
> >> Background
> >>
> >> As defined previously in FLIP-33 (the FLIP page has been changed so please 
> >> refer to the old version [3] ), metric “numRecordsOut” is used for 
> >> reporting the total number of output records since the sink started 
> >> (number of records written to the external system), and similarly for 
> >> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and 
> >> “numRecordsOutError”. Most sinks are following this naming and definition. 
> >> However, these metrics are ambiguous in the new Sink API as “numXXXOut” 
> >> could be used by the output of SinkWriterOperator for reporting number of 
> >> Committables delivered to SinkCommitterOperator. In order to resolve the 
> >> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics with 
> >> “numXXXSend”.
> >>
> >> Necessity of reverting this change
> >>
> >> - Metric names are actually public API, as end users need to configure 
> >> metric collecting and alerting system with metric names. Users have to 
> >> reset all configurations related to affected metrics.
> >> - This could also affect custom and external sinks not maintained by 
> >> Flink, which might have implemented with numXXXOut metrics.
> >> - The number of records sent to external system is way more important than 
> >> the number of Committables sent to SinkCommitterOperator, as the latter 
> >> one is just an internal implementation of sink. We could have a new metric 
> >> name for the latter one instead.
> >> - We could avoid splitting the project by version (like “plz use numXXXOut 
> >> before 1.15 and use numXXXSend after”) if we revert it ASAP, cosidering 
> >> 1.16 is still not released for now.
> >>
> >> As a consequence, I’d like to hear from devs and users about your opinion 
> >> on changing these metrics back to “numXXXOut”.
> >>
> >> Looking forward to your reply!
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-26126 
> >> 
> >> [2] https://issues.apache.org/jira/browse/FLINK-26492 
> >> 
> >> [1] FLIP-33, version 18: 
> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136 
> >> 
> >>
> >> Best,
> >> Qingsheng



RE: Re:Question about Flink Broadcast State event ordering

2022-10-10 Thread Qing Lim
Thanks both for your advice, I will give them a try!

From: Schwalbe Matthias 
Sent: 10 October 2022 08:35
To: 仙路尽头谁为峰 ; Qing Lim 
Cc: User 
Subject: RE: Re:Question about Flink Broadcast State event ordering

Hi Qing again,

Another point to consider: broadcast streams are subject to watermarking. i.e.

  *   You can wait to process the broadcast records only after the watermark 
passed, then
  *   order those records by time
  *   keep all broadcast records where the watermark not yet passed in some 
extra data structure without processing them
  *   that also means, the broadcast stream should not be configured to use 
watermark idleness, or manually implement watermark processing logic for the 
idling broadcast stream

This sounds a little complicated, but can definitely be done (I do that all the 
time 😊 )

Best regards

Thias



From: 仙路尽头谁为峰 mailto:xljtswf2...@163.com>>
Sent: Wednesday, October 5, 2022 10:13 AM
To: Qing Lim mailto:q@mwam.com>>
Cc: User mailto:user@flink.apache.org>>
Subject: [SPAM] 回复: Re:Question about Flink Broadcast State event ordering

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Qing:
  The key point is that the broadcast side may have different partitions that 
interleaves. If you can make sure those messages you want to be ordered go into 
the same partition, then I think the order can be reserved.

Best regards!
从 Windows 版邮件发送

发件人: Qing Lim
发送时间: 2022年10月5日 15:16
收件人: xljtswf2022
抄送: User
主题: RE: Re:Question about Flink Broadcast State event ordering

Hi, thanks for answering my question.

Is there anyway to make the order reflecting the upstream? I wish to broadcast 
messages that has deletion semantic, so ordering matters here.
I guess worst case I can use some logical timestamp to reason about order at 
downstream.

From: xljtswf2022 mailto:xljtswf2...@163.com>>
Sent: 05 October 2022 03:02
To: Qing Lim mailto:q@mwam.com>>
Cc: User mailto:user@flink.apache.org>>
Subject: Re:Question about Flink Broadcast State event ordering

Hi Qing:
> I think this is refering to the order between broadcasted element and non 
> broadcasted element, right?
  No, as broadcast and nonbroadcast stream are different streams, they will 
usually transfer with different tcp connection, we can not control the order of 
elements in different connections.
> The broadcasted element should arrive in the same order across all tasks, 
> right?
no. image the broadcast stream has 2 partitions ,say p1 and p2. and each 
partition has elements with index 1, 2, 3
then one downstream task may see the broadcast stream p1-1, p1-2 . p2-1, 
p2-2...
and another will see p1-1, p2-1,p1-2,p2-2
ps: for elements usually come in bulk, the index is just for explanation.

Best regards!



At 2022-10-04 21:54:23, "Qing Lim" mailto:q@mwam.com>> 
wrote:
Hi Flink user group,

I have a question around broadcast.

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?
The broadcasted element should arrive in the same order across all tasks, right?

For example, given a broadcasted stream A, and a non-broadcasted stream B

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn’t sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

Kind regards.



This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or

RE: Re:Question about Flink Broadcast State event ordering

2022-10-10 Thread Schwalbe Matthias
Hi Qing again,

Another point to consider: broadcast streams are subject to watermarking. i.e.

  *   You can wait to process the broadcast records only after the watermark 
passed, then
  *   order those records by time
  *   keep all broadcast records where the watermark not yet passed in some 
extra data structure without processing them
  *   that also means, the broadcast stream should not be configured to use 
watermark idleness, or manually implement watermark processing logic for the 
idling broadcast stream

This sounds a little complicated, but can definitely be done (I do that all the 
time 😊 )

Best regards

Thias



From: 仙路尽头谁为峰 
Sent: Wednesday, October 5, 2022 10:13 AM
To: Qing Lim 
Cc: User 
Subject: [SPAM] 回复: Re:Question about Flink Broadcast State event ordering

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Qing:
  The key point is that the broadcast side may have different partitions that 
interleaves. If you can make sure those messages you want to be ordered go into 
the same partition, then I think the order can be reserved.

Best regards!
从 Windows 版邮件发送

发件人: Qing Lim
发送时间: 2022年10月5日 15:16
收件人: xljtswf2022
抄送: User
主题: RE: Re:Question about Flink Broadcast State event ordering

Hi, thanks for answering my question.

Is there anyway to make the order reflecting the upstream? I wish to broadcast 
messages that has deletion semantic, so ordering matters here.
I guess worst case I can use some logical timestamp to reason about order at 
downstream.

From: xljtswf2022 mailto:xljtswf2...@163.com>>
Sent: 05 October 2022 03:02
To: Qing Lim mailto:q@mwam.com>>
Cc: User mailto:user@flink.apache.org>>
Subject: Re:Question about Flink Broadcast State event ordering

Hi Qing:
> I think this is refering to the order between broadcasted element and non 
> broadcasted element, right?
  No, as broadcast and nonbroadcast stream are different streams, they will 
usually transfer with different tcp connection, we can not control the order of 
elements in different connections.
> The broadcasted element should arrive in the same order across all tasks, 
> right?
no. image the broadcast stream has 2 partitions ,say p1 and p2. and each 
partition has elements with index 1, 2, 3
then one downstream task may see the broadcast stream p1-1, p1-2 . p2-1, 
p2-2...
and another will see p1-1, p2-1,p1-2,p2-2
ps: for elements usually come in bulk, the index is just for explanation.

Best regards!



At 2022-10-04 21:54:23, "Qing Lim" mailto:q@mwam.com>> 
wrote:
Hi Flink user group,

I have a question around broadcast.

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?
The broadcasted element should arrive in the same order across all tasks, right?

For example, given a broadcasted stream A, and a non-broadcasted stream B

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn’t sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

Kind regards.



This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Jark Wu
Thanks for discovering this problem, Qingsheng!

I'm also +1 for reverting the breaking changes.

IIUC, currently, the behavior of "numXXXOut" metrics of the new and old
sink is inconsistent.
We have to break one of them to have consistent behavior. Sink V2 is an
evolving API which is just introduced in 1.15.
I think it makes sense to break the unstable API instead of the stable API
which many connectors and users depend on.

Best,
Jark



On Mon, 10 Oct 2022 at 11:36, Jingsong Li  wrote:

> Thanks for driving, Qingsheng.
>
> +1 for reverting sink metric name.
>
> We often forget that metric is also one of the important APIs.
>
> +1 for releasing 1.15.3 to fix this.
>
> Best,
> Jingsong
>
> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  wrote:
> >
> > Thanks for raising the discussion, Qingsheng,
> >
> > +1 on reverting the breaking changes.
> >
> > In addition, we might want to release a 1.15.3 to fix this and update
> the previous release docs with this known issue, so that users can upgrade
> to 1.15.3 when they hit it. It would also be good to add some backwards
> compatibility tests on metrics to avoid unintended breaking changes like
> this in the future.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren  wrote:
> >>
> >> Hi devs and users,
> >>
> >> I’d like to start a discussion about reverting a breaking change about
> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
> >>
> >> TL;DR
> >>
> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace
> by “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names
> are public APIs, this is a breaking change to end users and not backward
> compatible. Also unfortunately this breaking change was not discussed in
> the mailing list before.
> >>
> >> Background
> >>
> >> As defined previously in FLIP-33 (the FLIP page has been changed so
> please refer to the old version [3] ), metric “numRecordsOut” is used for
> reporting the total number of output records since the sink started (number
> of records written to the external system), and similarly for
> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and
> “numRecordsOutError”. Most sinks are following this naming and definition.
> However, these metrics are ambiguous in the new Sink API as “numXXXOut”
> could be used by the output of SinkWriterOperator for reporting number of
> Committables delivered to SinkCommitterOperator. In order to resolve the
> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics with
> “numXXXSend”.
> >>
> >> Necessity of reverting this change
> >>
> >> - Metric names are actually public API, as end users need to configure
> metric collecting and alerting system with metric names. Users have to
> reset all configurations related to affected metrics.
> >> - This could also affect custom and external sinks not maintained by
> Flink, which might have implemented with numXXXOut metrics.
> >> - The number of records sent to external system is way more important
> than the number of Committables sent to SinkCommitterOperator, as the
> latter one is just an internal implementation of sink. We could have a new
> metric name for the latter one instead.
> >> - We could avoid splitting the project by version (like “plz use
> numXXXOut before 1.15 and use numXXXSend after”) if we revert it ASAP,
> cosidering 1.16 is still not released for now.
> >>
> >> As a consequence, I’d like to hear from devs and users about your
> opinion on changing these metrics back to “numXXXOut”.
> >>
> >> Looking forward to your reply!
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-26126
> >> [2] https://issues.apache.org/jira/browse/FLINK-26492
> >> [1] FLIP-33, version 18:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136
> >>
> >> Best,
> >> Qingsheng
>