[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-17 Thread Jiayi Liao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617952#comment-16617952
 ] 

Jiayi Liao commented on FLINK-10348:


[~elevy]
Ok, what you said makes more sense to me, I can agree with your first opnion.
But for the second one, there always be a partition with the lowest watermark, 
it's very hard for you to make a strategy to do "selectively forward messages" 
unless you want to consume one partition at a time, which is very bad.
I can't figure out any better ideas:(.


> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-17 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617899#comment-16617899
 ] 

Elias Levy commented on FLINK-10348:


[~wind_ljy]

Re: 1.  The problem is timestamp alignment.  Setting like fetch sizes, max 
waits, etc are simply mechanism you can use to attempt to influence the rate of 
processing the better align the timestamps.  Those mechanism are at least one 
level removed from the actual issue.  It is best to address the issue directly 
by attempting to align timestamp during consumption.

Re: 2.  Internally the Kafka consumer behaves like a multiple input operator, 
merging watermarks and messages from each partition, which it then forwards 
downstream.  The Kafka consumer can also selectively forward messages from the 
partitions with the lowest waternark if they are available. 

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-16 Thread Jiayi Liao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617054#comment-16617054
 ] 

Jiayi Liao commented on FLINK-10348:


[~elevy] 
1. From my perspective, we use fetch request to request data, the 
parameters(fetch size/max wait..) are set by the our flink programs, so it's 
our job to decide how to request these data.
2. About the multiple input operators, it won't help if the partitions are more 
than the parallelism, because the operator consumes the earliest data and 
oldest data at the same time and it's hard to generate the watermark.

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-16 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616933#comment-16616933
 ] 

Elias Levy commented on FLINK-10348:


I would suggest the strategy employed by Kafka Streams, which performs a best 
effort attempt to align streams by selectively fetching from the stream with 
the lowest watermark of there are messages available. 

Rather than implementing something like this writhin the Kafka connector 
source, which are independent tasks in Flink, I would suggest implementing it 
within multiple input operators. The operator can selectively process messages 
from the input stream with the lowest waternark if they are available. Back 
preassure can the take care of slowing down the higher volume input of 
nessesary. 

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-16 Thread Jiayi Liao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616853#comment-16616853
 ] 

Jiayi Liao commented on FLINK-10348:


[~twalthr][~Zentol] What do you think of this feature?

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)