Re: FLINK-20767 - Support for nested fields filter push down

2023-08-13 Thread liu ron
Hi, Venkata krishnan

Thanks for driving this work, look forward to your FLIP.

Best,
Ron

Venkatakrishnan Sowrirajan  于2023年8月13日周日 14:34写道:

> Thanks Yunhong. That's correct. I am able to make it work locally.
> Currently, in the process of writing a FLIP for the necessary changes to
> the SupportsFilterPushDown API to support nested fields filter push down.
>
> Regards
> Venkata krishnan
>
>
> On Mon, Aug 7, 2023 at 8:28 PM yh z  wrote:
>
> > Hi Venkatakrishnan,
> > Sorry for the late reply. I have looked at the code and feel like you
> need
> > to modify the logic of the
> > ExpressionConverter.visit(FieldReferenceExpression expression) method to
> > support nested types,
> > which are not currently supported in currently code.
> >
> > Regards,
> > Yunhong Zheng (Swuferhong)
> >
> > Venkatakrishnan Sowrirajan  于2023年8月7日周一 13:30写道:
> >
> > > (Sorry, I pressed send too early)
> > >
> > > Thanks for the help @zhengyunhon...@gmail.com.
> > >
> > > Agree on not changing the API as much as possible as well as wrt
> > > simplifying Projection pushdown with nested fields eventually as well.
> > >
> > > In terms of the code itself, currently I am trying to leverage the
> > > FieldReferenceExpression to also handle nested fields for filter push
> > down.
> > > But where I am currently struggling to make progress is, once the
> filters
> > > are pushed to the table source itself, in
> > >
> PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable
> > > there is a conversion from List > > FieldReferenceExpression) to the List itself.
> > >
> > > If you have some pointers for that, please let me know. Thanks.
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > >
> > > On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu>
> > > wrote:
> > >
> > > > Thanks @zhengyunhon...@gmail.com
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Sun, Aug 6, 2023 at 6:16 PM yh z 
> wrote:
> > > >
> > > >> Hi, Venkatakrishnan,
> > > >> I think this is a very useful feature. I have been focusing on the
> > > >> development of the flink-table-planner module recently, so if you
> need
> > > >> some
> > > >> help, I can assist you in completing the development of some
> sub-tasks
> > > or
> > > >> code review.
> > > >>
> > > >> Returning to the design itself, I think it's necessary to modify
> > > >> FieldReferenceExpression or re-implement a
> > > NestedFieldReferenceExpression.
> > > >> As for modifying the interface of SupportsProjectionPushDown, I
> think
> > we
> > > >> need to make some trade-offs. As a connector developer, the
> stability
> > of
> > > >> the interface is very important. If there are no unresolved bugs, I
> > > >> personally do not recommend modifying the interface. However, when I
> > > first
> > > >> read the code of SupportsProjectionPushDown, the design of int[][]
> was
> > > >> very
> > > >> confusing for me, and it took me a long time to understand it by
> > running
> > > >> specify UT tests. Therefore, in terms of the design of this
> interface
> > > and
> > > >> the consistency between different interfaces, there is indeed room
> for
> > > >> improvement it.
> > > >>
> > > >> Thanks,
> > > >> Yunhong Zheng (Swuferhong)
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Becket Qin  于2023年8月3日周四 07:44写道:
> > > >>
> > > >> > Hi Jark,
> > > >> >
> > > >> > If the FieldReferenceExpression contains an int[] to support a
> > nested
> > > >> field
> > > >> > reference, List (or
> > > >> FieldReferenceExpression[])
> > > >> > and int[][] are actually equivalent. If we are designing this from
> > > >> scratch,
> > > >> > personally I prefer using List for
> > > >> consistency,
> > > >> > i.e. always resolving everything to expressions for users.
> > Projection
> > > >> is a
> > > >> > simpler case, but should not be a special case. This avoids doing
> > the
> > > >> same
> > > >> > thing in different ways which is also a confusion to the users. To
> > me,
> > > >> the
> > > >> > int[][] format would become kind of a technical debt after we
> extend
> > > the
> > > >> > FieldReferenceExpression. Although we don't have to address it
> right
> > > >> away
> > > >> > in the same FLIP, this kind of debt accumulates over time and
> makes
> > > the
> > > >> > project harder to learn and maintain. So, personally I prefer to
> > > address
> > > >> > these technical debts as soon as possible.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Jiangjie (Becket) Qin
> > > >> >
> > > >> > On Wed, Aug 2, 2023 at 8:19 PM Jark Wu  wrote:
> > > >> >
> > > >> > > Hi,
> > > >> > >
> > > >> > > I agree with Becket that we may need to extend
> > > >> FieldReferenceExpression
> > > >> > to
> > > >> > > support nested field access (or maybe a new
> > > >> > > NestedFieldReferenceExpression).
> > > >> > > But I have some concerns about evolving the
> > > >> > > SupportsProjectionPushDown.applyProjection.
> > > >> > > A projection is much simpler than Filter Expression which only

[jira] [Created] (FLINK-32859) Improve the state of adaptive sheduler with StateWithoutExecutionGraph

2023-08-13 Thread Rui Fan (Jira)
Rui Fan created FLINK-32859:
---

 Summary: Improve the state of adaptive sheduler with 
StateWithoutExecutionGraph
 Key: FLINK-32859
 URL: https://issues.apache.org/jira/browse/FLINK-32859
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Rui Fan
Assignee: Rui Fan


Currently, most of states of adaptive scheduler are extends 
StateWithExecutionGraph, and it can reduce a lots of repeated code.

We can define the State{color:red}Without{color}ExecutionGraph and the 
corresponding Context, then the rest of states can extend the 
State{color:red}Without{color}ExecutionGraph, such as: Created, 
WaitingForResources, CreatingExecutionGraph.

This improvement can reduce a lot of code for these states, such as: cancel(), 
suspend(), getJob(), handleGlobalFailure() and getLogger().

These methods of Created, WaitingForResources and CreatingExecutionGraph are 
same.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32860) Add missing visibility annotation for Table APIs

2023-08-13 Thread Jane Chan (Jira)
Jane Chan created FLINK-32860:
-

 Summary: Add missing visibility annotation for Table APIs
 Key: FLINK-32860
 URL: https://issues.apache.org/jira/browse/FLINK-32860
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.19.0
Reporter: Jane Chan


Based on the 
[discussion|https://lists.apache.org/thread/zl2rmodsjsdb49tt4hn6wv3gdwo0m31o], 
all classes in flink-table-api-java, flink-table-api-java-bridge, and 
flink-table-common should be marked with proper visibility annotations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32861) Batch mode, sink to multables the task graph of web page monitor jobs , lack sink task graph block

2023-08-13 Thread zhengyuan (Jira)
zhengyuan created FLINK-32861:
-

 Summary: Batch mode, sink to multables the task graph of web page 
monitor jobs , lack sink task graph block
 Key: FLINK-32861
 URL: https://issues.apache.org/jira/browse/FLINK-32861
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Affects Versions: 1.16.3
 Environment: flink 1.16

centos 7 64

mysql 5.7

paimon 0.5

open jdk 1.8 64
Reporter: zhengyuan
 Attachments: flink-mult-out.sql, screenshot.png

Batch mode, sink to multables (mysql, paimon) the task graph of web page 
monitor jobs , lack sink task graph block. Expect 2 sink Task graphs . but the 
results is correct.


flink sql detail see attachements flink-mult-out.sql and screenshot.

flink-mult-out.sql:

==

SET execution.checkpointing.interval=1;
SET 
state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/20230814103606840;
SET execution.runtime-mode=batch;
CREATE TABLE source_jdbc_9kT_QLyGtM(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE)  WITH ( 
    'connector'='jdbc',
    'scan.fetch-size'='30',
    
'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
    'username'='xxx',
    'password'='',
    'table-name'='v_csmx_129_default'
);

 

CREATE VIEW tranform_sql_mapping AS select 
`id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from 
source_jdbc_9kT_QLyGtM where  ( `id`<5 );


CREATE TABLE data_processing_out_1(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE)  WITH ( 
    'connector'='jdbc',
    'sink.buffer-flush.max-rows'='5',
    'sink.buffer-flush.interval'='0',
    
'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
    'username'='',
    'password'='x',
    'table-name'='data_processing_out_1'
);

INSERT INTO data_processing_out_1 select 
`id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from 
tranform_sql_mapping;

CREATE CATALOG paimon WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods'   
);

USE CATALOG paimon;
create database if not exists paimon.paimon_ods_db;
drop table if exists paimon_ods_db.paimon_mysql_test01;
CREATE TABLE if not exists paimon_ods_db.paimon_mysql_test01(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE
)  WITH (
   'sink.parallelism'='8',
   'bucket'='8',
   'bucket-key'='tenant_code',
   'sink.use-managed-memory-allocator'='true',
   'sink.managed.writer-buffer-memory'='512MB',
   'num-sorted-run.compaction-trigger'='20',
   'write-buffer-size'='1024MB',
   'write-buffer-spillable'='true',
   'write-mode'='append-only'
);

INSERT INTO paimon_ods_db.paimon_mysql_test01 select 
`id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from 
default_catalog.default_database.tranform_sql_mapping;

 

==

results

==

trino> use paimon.paimon_ods_db;
USE
trino:paimon_ods_db> select count(*) from paimon_mysql_test01;
 _col0 
---
 4 
(1 row)

 

mysql> SELECT count(*) FROM data_storage.data_processing_out_1;
+--+
| count(*) |
+--+
|    4 |
+--+
1 row in set (0.06 sec)

mysql> 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-13 Thread Xuannan Su
Hi Xintong,

Thanks for the reply.

I have considered using the timestamp in the records to determine the backlog 
status, and decided to use watermark at the end. By definition, watermark is 
the time progress indication in the data stream. It indicates the stream’s 
event time has progressed to some specific time. On the other hand, timestamp 
in the records is usually used to generate the watermark. Therefore, it appears 
more appropriate and intuitive to calculate the event time lag by watermark and 
determine the backlog status. And by using the watermark, we can easily deal 
with the out-of-order and the idleness of the data.

Please let me know if you have further questions.

Best,
Xuannan
On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> Thanks for preparing the FLIP, Xuannan.
>
> +1 in general.
>
> A quick question, could you explain why we are relying on the watermark for
> emitting the record attribute? Why not use timestamps in the records? I
> don't see any concern in using watermarks. Just wondering if there's any
> deep considerations behind this.
>
> Best,
>
> Xintong
>
>
>
> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I am opening this thread to discuss FLIP-328: Allow source operators to
> > determine isProcessingBacklog based on watermark lag[1]. We had a several
> > discussions with Dong Ling about the design, and thanks for all the
> > valuable advice.
> >
> > The FLIP aims to target the use-case where user want to run a Flink job to
> > backfill historical data in a high throughput manner and continue
> > processing real-time data with low latency. Building upon the backlog
> > concept introduced in FLIP-309[2], this proposal enables sources to report
> > their status of processing backlog based on the watermark lag.
> >
> > We would greatly appreciate any comments or feedback you may have on this
> > proposal.
> >
> > Best,
> > Xuannan
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-13 Thread Xintong Song
Thanks for the explanation.

I wonder if it makes sense to not expose this detail via the configuration
option. To be specific, I suggest not mentioning the "watermark" keyword in
the configuration key and description.

   - From the users' perspective, I think they only need to know there's a
   lag higher than the given threshold, Flink will consider latency of
   individual records as less important and prioritize throughput over it.
   They don't really need the details of how the lags are calculated.
   - For the internal implementation, I also think using watermark lags is
   a good idea, for the reasons you've already mentioned. However, it's not
   the only possible option. Hiding this detail from users would give us the
   flexibility to switch to other approaches if needed in future.
   - We are currently working on designing the ProcessFunction API
   (consider it as a DataStream API V2). There's an idea to introduce a
   Generalized Watermark mechanism, where basically the watermark can be
   anything that needs to travel along the data-flow with certain alignment
   strategies, and event time watermark would be one specific case of it. This
   is still an idea and has not been discussed and agreed on by the community,
   and we are preparing a FLIP for it. But if we are going for it, the concept
   "watermark-lag-threshold" could be ambiguous.

I do not intend to block the FLIP on this. I'd also be fine with
introducing the configuration as is, and changing it later, if needed, with
a regular deprecation and migration process. Just making my suggestions.


Best,

Xintong



On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  wrote:

> Hi Xintong,
>
> Thanks for the reply.
>
> I have considered using the timestamp in the records to determine the
> backlog status, and decided to use watermark at the end. By definition,
> watermark is the time progress indication in the data stream. It indicates
> the stream’s event time has progressed to some specific time. On the other
> hand, timestamp in the records is usually used to generate the watermark.
> Therefore, it appears more appropriate and intuitive to calculate the event
> time lag by watermark and determine the backlog status. And by using the
> watermark, we can easily deal with the out-of-order and the idleness of the
> data.
>
> Please let me know if you have further questions.
>
> Best,
> Xuannan
> On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> > Thanks for preparing the FLIP, Xuannan.
> >
> > +1 in general.
> >
> > A quick question, could you explain why we are relying on the watermark
> for
> > emitting the record attribute? Why not use timestamps in the records? I
> > don't see any concern in using watermarks. Just wondering if there's any
> > deep considerations behind this.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
> >
> > > Hi all,
> > >
> > > I am opening this thread to discuss FLIP-328: Allow source operators to
> > > determine isProcessingBacklog based on watermark lag[1]. We had a
> several
> > > discussions with Dong Ling about the design, and thanks for all the
> > > valuable advice.
> > >
> > > The FLIP aims to target the use-case where user want to run a Flink
> job to
> > > backfill historical data in a high throughput manner and continue
> > > processing real-time data with low latency. Building upon the backlog
> > > concept introduced in FLIP-309[2], this proposal enables sources to
> report
> > > their status of processing backlog based on the watermark lag.
> > >
> > > We would greatly appreciate any comments or feedback you may have on
> this
> > > proposal.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > [2]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >
>


[jira] [Created] (FLINK-32862) Support INIT operation type to be compatible with DTS on Alibaba Cloud

2023-08-13 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-32862:
-

 Summary: Support INIT operation type to be compatible with DTS on 
Alibaba Cloud
 Key: FLINK-32862
 URL: https://issues.apache.org/jira/browse/FLINK-32862
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Hang Ruan


The operation type of canal json messages from DTS on Alibaba Cloud may contain 
a new type `INIT`. We cannot handle these messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)