Repository: beam Updated Branches: refs/heads/master f799a57af -> 09d131ced
[BEAM-1291] KafkaIO: don't log warnig in offset fetcher while closing. add partition to the message. break when the reader is closed. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9396c5d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9396c5d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9396c5d4 Branch: refs/heads/master Commit: 9396c5d4dad5cdd38945a368f2ba9951a67bc9f4 Parents: f799a57 Author: Raghu Angadi <rang...@google.com> Authored: Fri Jan 20 13:24:09 2017 -0800 Committer: Sela <ans...@paypal.com> Committed: Sat Jan 21 16:57:03 2017 +0200 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9396c5d4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 735b8e7..2de2174 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1058,7 +1058,11 @@ public class KafkaIO { long offset = offsetConsumer.position(p.topicPartition); p.setLatestOffset(offset); } catch (Exception e) { - LOG.warn("{}: exception while fetching latest offsets. ignored.", this, e); + if (closed.get()) { + break; + } + LOG.warn("{}: exception while fetching latest offset for partition {}. will be retried.", + this, p.topicPartition, e); p.setLatestOffset(UNINITIALIZED_OFFSET); // reset }