> Yes, you are right that sorting and then assigning shard-subtask mappings
> would not have deterministic assignment.
> Non-deterministic assignments would cause issues when restoring the
> consumer state.
>

Isn't the issue that some shard assignments may not have been checkpointed
and so may end up in different subtasks when they are re-discovered?


> For now, a general solution would be to move away from distributing shards
> over subtasks in a round-robin fashion, but just use simple uniform hashing.
> This would avoid serious skew in specific Kinesis rescaling scenarios
> compared to the current solution, but in cases where Kinesis streams
> weren’t sharded at all, we would maybe not have a perfect distribution.
>

Below are a few examples of distribution with variation in hash function.

(Result is map numberOfShards -> numberOfSubTasksWithThatNumberOfShards)

#1 current hashing:

int hash = 17;
hash = 37 * hash + streamName.hashCode();
hash = 37 * hash + shardId.hashCode();

{0=21, 1=8, 2=2, 3=4, 4=3, 5=2, 6=5, 7=2, 8=5, 9=3, 10=3, 11=3, 12=3}

#2  Hashing.consistentHash

int hash = Hashing.consistentHash(shardId.hashCode(),
totalNumberOfConsumerSubtasks);
{0=1, 1=3, 2=9, 3=18, 4=11, 5=8, 6=7, 7=4, 8=2, 11=1}

#3 Hashing.murmur3_32()
int hash = hf.hashUnencodedChars(shardId).asInt();

{0=2, 1=5, 2=11, 3=9, 4=12, 5=12, 6=7, 8=4, 10=2}

#2 isn't perfect but closer to where we would like to be. And since there
is no silver bullet the user should be able to override the hashing.


The shard state in the FlinkKinesisConsumer is a Union list state, meaning
> that all consumer subtasks will see all shard states on restore.
> This should allow us to have different shard assignment logic when
> restoring.
>

Do you have a scenario in mind where we would not want to retain
checkpointed assignments?


Thanks,
Thomas

Reply via email to