[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366056#comment-16366056
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5393


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360377#comment-16360377
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tweise changes look good, will merge this to `master`.
Thanks a lot for the work!


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357972#comment-16357972
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai any update?


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356075#comment-16356075
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai done adding test

(CI failure BackPressureStatsTrackerITCase.testBackPressuredProducer 
unrelated to this PR)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355190#comment-16355190
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tweise ok, I think I can agree on your last comment.

> Should I add a (trivial) unit test that asserts that 
isThisSubtaskShouldSubscribeTo applies modulus to assigner returned value that 
falls outside the subtask index range?

That sounds good! Once we add that, I think we're good to go.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355075#comment-16355075
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai Assignment and restore are orthogonal; this PR doesn't change 
restore. I therefore don't see the need to add another migration test for it. 
There is also no change to how the index is computed by default, it is just 
that part of what happened earlier in `isThisSubtaskShouldSubscribeTo` is now 
in the assigner. Should I add a (trivial) unit test that asserts that 
`isThisSubtaskShouldSubscribeTo` applies modulus to assigner returned value 
that falls outside the subtask index range?


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354988#comment-16354988
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r166523133
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

I only see references to the partitioner from the package 
org.apache.flink.streaming.connectors.kinesis and not from subpackages - which 
is what I pointed out before. Since you seem to suggest that isn't a 
convention, I will move the class (to me it is just an observation and not 
important).


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353705#comment-16353705
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tweise regarding what the proposed migration test is going to assert:

> The assigner does not influence how state is saved and restored. Even 
when the assigner returns invalid index, the modulus will ensure that the shard 
gets assigned.

This is exactly what I think the test is verifying, without any knowledge 
of what internally is happening. Using a different custom assigner across 
restores should not result in any state lose / change.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353703#comment-16353703
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r166254162
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

@tweise 
I think it is fine. For example, we have a `KinesisPartitioner` interface 
as the public API, and it lives under the same package as 
`FlinkKinesisConsumer`.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352526#comment-16352526
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai please see changes and couple questions.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351656#comment-16351656
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165834978
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

It would result in import from the parent package in KinesisDataFetcher. 
Looking at a few other classes it wasn't clear to me that this is the 
established pattern, so please confirm.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351646#comment-16351646
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165834292
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams,
this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = 
checkNotNull(deserializationSchema);
+   this.shardAssigner = checkNotNull(shardAssigner);
--- End diff --

adding this in FlinkKinesisConsumer


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351270#comment-16351270
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai What is the proposed migration test going to assert? The assigner 
does not influence how state is saved and restored. Even when the assigner 
returns invalid index, the modulus will ensure that the shard gets assigned.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350040#comment-16350040
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588395
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

I think it makes sense to move this class to the same level as 
`FlinkKinesisConsumer`, since it is part of the public API.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350045#comment-16350045
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590209
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
--- End diff --

I feel like this section of the Javadoc should be part of the Javadoc for 
the original consumer constructors, and should guide them to use the 
`setShardAssigner` method if the do encounter the case of serious shard skew.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350050#comment-16350050
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588489
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
--- End diff --

mentions Kafka partition, should be Kinesis shard


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350043#comment-16350043
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165587909
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
+*
+* It is recommended to monitor the shard distribution and adjust 
assignment appropriately.
+* Custom implementation may optimize the hash function or use static 
overrides to limit skew.
+ *
+ * @param shard the shard to determine
+ * @param numParallelSubtasks total number of subtasks
+ * @return index or hash code
--- End diff --

nit:
I think it would be more straightforward if we just say this returns the 
"target index".
If the index value is outside the subtask range, we perform a modulus 
operation.

So basically what the PR is already doing, just re-terming it to be more 
straightforward.
What do you think? This is also just a personal preference :)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350046#comment-16350046
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165591410
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, 
KinesisDeserializationSchema `shardAssigner`


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350048#comment-16350048
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165589453
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
--- End diff --

The resulting distribution of shards "**should**" have the following 
contract.

i.e., we can't guarantee it, instead the user implementation should 
guarantee it.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350041#comment-16350041
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588295
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
--- End diff --

Add `@PublicEvolving` annotation, to make it clear this is a public API.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350039#comment-16350039
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165587359
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -182,13 +191,15 @@ public KinesisDataFetcher(List streams,

SourceFunction.SourceContext sourceContext,
RuntimeContext 
runtimeContext,
Properties configProps,
-   
KinesisDeserializationSchema deserializationSchema) {
+   
KinesisDeserializationSchema deserializationSchema,
+   KinesisShardAssigner 
kinesisShardToSubTaskIndexFn) {
--- End diff --

mismatching variable name: `kinesisShardToSubTaskIndexFn` --> 
`kinesisShardAssigner` or just `shardAssigner`


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350042#comment-16350042
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165586434
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -76,6 +77,9 @@
 @Internal
 public class KinesisDataFetcher {
 
+   public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = 
(shard, subtasks) -> shard.hashCode();
+
+
--- End diff --

nit: unnecessary empty line


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350047#comment-16350047
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165594279
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, 
KinesisDeserializationSchema FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350044#comment-16350044
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165589748
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
--- End diff --

of a single "**Kinesis stream**", not topic (which is Kafka terms)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350037#comment-16350037
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588081
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
--- End diff --

nit: add one empty line above comment block (just a personal preference, 
though)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350049#comment-16350049
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590401
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
--- End diff --

The overview class Javadoc could probably be less generic.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350038#comment-16350038
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165586994
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams,
this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = 
checkNotNull(deserializationSchema);
+   this.shardAssigner = checkNotNull(shardAssigner);
--- End diff --

We also need to try cleaning the closure of the given object (if it is a 
non-static inner class):
```
ClosureCleaner.clean(shardAssigner, true);
```


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349869#comment-16349869
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai the PR is ready for review now 


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349100#comment-16349100
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

GitHub user tweise reopened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer

## What is the purpose of the change

Allow the user to customize Kinesis shard to subtask assignment in the 
Kinesis consumer.

## Brief change log

Added pluggable shard assigner.

## Verifying this change

Added unit test. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

Javadoc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5393


commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369
Author: Thomas Weise 
Date:   2018-01-31T01:44:44Z

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347000#comment-16347000
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165084048
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
--- End diff --

This needs a much more stronger Javadoc explaining the contract of 
deterministic assignments.
See `KafkaTopicPartitionAssigner` for an example.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346998#comment-16346998
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165081173
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
--- End diff --

I would prefer if this is a top-level class, instead of nested in 
`KinesisDataFetcher` (which is actually an internal class).

Also, the `Fn` name could also be polished a bit. Maybe something like 
`KinesisShardAssigner` would be better? That would also be coherent name-wise 
with the Kafka side, which has a `KafkaTopicPartitionAssigner` class (though it 
isn't exposed).


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346997#comment-16346997
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165081311
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
--- End diff --

This also needs to be `Serializable`, since it'll be shipped along with the 
`JobGraph` to the JM.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347001#comment-16347001
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165083343
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*
+* @param shard the shard to determine
+* @param totalNumberOfSubtasks total number of subtasks
+* @return index or hash code
+*/
+   // TODO: extra parameter can be eliminated by creating hash 
function after runtime context is present
+   int getSubTaskIndex(StreamShardHandle shard, int 
totalNumberOfSubtasks);
--- End diff --

I actually prefer passing the `totalNumberOfSubtasks` value independently, 
instead of passing in `runtimeContext`. IMO, it provides more context on the 
nature of the assignment.

Moreover, IMO, having a factory as the API is much more complicating for 
the user.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346999#comment-16346999
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165084584
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

Please see my comments below. 


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346257#comment-16346257
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r164952695
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

createFn(...) that will allow the function to be created with access to 
runtime context (like the number of subtasks), and then change the fn signature 
to only take shard metadata as parameter. Subclasses can override createFn, 
instead of having the property.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346124#comment-16346124
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise closed the pull request at:

https://github.com/apache/flink/pull/5393


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346121#comment-16346121
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, 

[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341450#comment-16341450
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8516:


Posting the mailing list discussion thread of the issue here, for easier 
reference:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Kinesis-consumer-shard-skew-FLINK-8516-td20843.html

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341449#comment-16341449
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8516:


[~thw] done, you can now assign tickets to yourself!
I'm also assuming you'd like to work on this issue, so I assigned it for you.

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341233#comment-16341233
 ] 

Thomas Weise commented on FLINK-8516:
-

Can a PMC please add me as contributor, thanks! CC: [~StephanEwen]

 

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-25 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340564#comment-16340564
 ] 

Thomas Weise commented on FLINK-8516:
-

Relevant piece of code:

[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L594]
{code:java}
public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
int totalNumberOfConsumerSubtasks,
int indexOfThisConsumerSubtask) {
return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == 
indexOfThisConsumerSubtask;
}{code}
 

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)