assign a random shard id once in setup(), rather than each time in processElement().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/236484b0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/236484b0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/236484b0 Branch: refs/heads/master Commit: 236484b010a8e7b8bb6e6bc60c20ab7fced2b964 Parents: f4f6105 Author: Raghu Angadi <rang...@google.com> Authored: Tue Aug 8 14:18:54 2017 -0700 Committer: Raghu Angadi <rang...@google.com> Committed: Tue Oct 17 00:02:05 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/236484b0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 63dc734..78227a0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1866,15 +1866,21 @@ public class KafkaIO { */ private static class EOSReshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> { private final int numShards; + private transient int shardId; EOSReshard(int numShards) { this.numShards = numShards; } + @Setup + public void setup() { + shardId = ThreadLocalRandom.current().nextInt(numShards); + } + @ProcessElement public void processElement(ProcessContext ctx) { - int shard = ThreadLocalRandom.current().nextInt(numShards); - ctx.output(KV.of(shard, ctx.element())); + shardId = (shardId + 1) % numShards; // round-robin among shards. + ctx.output(KV.of(shardId, ctx.element())); } } @@ -2196,7 +2202,7 @@ public class KafkaIO { // throw new IllegalStateException(String.format( "Kafka metadata exists for shard %s, but there is no stored state for it. " - + "This mostly indicates groupId '%s' is already used else where or in earlier runs. " + + "This mostly indicates groupId '%s' is used else where or in earlier runs. " + "Try another group id. Metadata for this shard on Kafka : '%s'", shard, spec.getSinkGroupId(), committed.metadata())); }