[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2021-01-04 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17258689#comment-17258689
 ] 

Qingsheng Ren commented on FLINK-20777:
---

I took more investigation on the logic here. Actually the partition discovery 
should be enabled when: 1. source is unbounded 2. stoppingOffset is not 
specified. 

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-30 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256853#comment-17256853
 ] 

Qingsheng Ren commented on FLINK-20777:
---

[~jark] Partition discovery is disabled by default in the 
\{{FlinkKafkaConsumer}}. Under the new design of Kafka source based on FLIP-27 
I think it's better to enable it by default as [~becket_qin] and I explained 
above. 

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-30 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256437#comment-17256437
 ] 

Jark Wu commented on FLINK-20777:
-

[~renqs], what's the behavior of current Kafka connector?

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-30 Thread jiawen xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256395#comment-17256395
 ] 

jiawen xiao commented on FLINK-20777:
-

ok, i will create a pr for it

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-30 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256381#comment-17256381
 ] 

Jark Wu commented on FLINK-20777:
-

I'm also fine with enabled by default for unbounded mode, and disable for 
bounded mode. I think periodically check for 30s interval in a single task 
doesn't affect the performance too much. 

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-29 Thread jiawen xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256334#comment-17256334
 ] 

jiawen xiao commented on FLINK-20777:
-

hi , [~renqs] , Maybe we need more people’s opinions

hi ,[~jark]   WDYT?

I'm not sure whether constant checking of kafka meta will bring performance 
impact?

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-29 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256288#comment-17256288
 ] 

Jiangjie Qin commented on FLINK-20777:
--

>From a Kafka user's perspective, I would expect the partition changes to be 
>picked up by the Flink job automatically, so I don't have to restart the Flink 
>job. Therefore, enabling partition discovery by default seems reasonable to me.

[~873925...@qq.com] I am curious about the case that people don't want to start 
consuming from the new partitions right away. Do you mind sharing some concrete 
scenarios? Thanks.

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-29 Thread jiawen xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255887#comment-17255887
 ] 

jiawen xiao commented on FLINK-20777:
-

Actually,these scenarios are determined according to your production 
environment.it is optional when you need.

You can first experiment to determine its impact on performance, when it is 
turned on by default.

it need to judge whether this change is necessary.

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-29 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255869#comment-17255869
 ] 

Qingsheng Ren commented on FLINK-20777:
---

>From my perspective, I prefer enabling discovering partitions by default. This 
>is helpful when upstream Kafka topic scales out for some reason, or a new 
>topic matching the topic pattern is created, so that the Flink job doesn't 
>have to restart to discover the new topic/partition. I think this discovering 
>action won't introduce a strong overhead to performance since it will be 
>running in a thread on SplitEnumerator, and won't interweave with data 
>consuming process.  

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-29 Thread jiawen xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255865#comment-17255865
 ] 

jiawen xiao commented on FLINK-20777:
-

Yes ,i know what your said.

In my opinion, Turning off dynamic partition by default is the design idea. 
Under this premise, the default value of property 
"partition.discovery.interval.ms"  is only for reference.

WDYT?

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-28 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255853#comment-17255853
 ] 

Qingsheng Ren commented on FLINK-20777:
---

Hi [~873925...@qq.com], Thanks for your reviewing~ I think a better 
implementation would be only overriding the value to -1 when the the source is 
bounded. For unbounded mode it should be assigned to the default value 30 
seconds as documented in {KafkaSourceOptions} if not passed in explicitly. 
Actually this is due to the implementation of {maybeOverride} helper function.

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-28 Thread jiawen xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255847#comment-17255847
 ] 

jiawen xiao commented on FLINK-20777:
-

sorry [~renqs],i think this is not a problem. your can read code in 
{{KafkaSourceBuilder}} .

The reason you will find "If the source is bounded, do not run periodic 
partition discovery."

so it's a check for bounded streaming which can prevent users from enabling 
dynamic partition discovery in the case of bounded sources.

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20777) Default value of property "partition.discovery.interval.ms" is not as documented in new Kafka Source

2020-12-27 Thread jiawen xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255350#comment-17255350
 ] 

jiawen xiao commented on FLINK-20777:
-

hi,[~renqs] thanks for raising this question. which version did  you find?

> Default value of property "partition.discovery.interval.ms" is not as 
> documented in new Kafka Source
> 
>
> Key: FLINK-20777
> URL: https://issues.apache.org/jira/browse/FLINK-20777
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.12.1
>
>
> The default value of property "partition.discovery.interval.ms" is documented 
> as 30 seconds in {{KafkaSourceOptions}}, but it will be set as -1 in 
> {{KafkaSourceBuilder}} if user doesn't pass in this property explicitly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)