Add basic metrics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99f55419 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99f55419 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99f55419 Branch: refs/heads/master Commit: 99f554197ada52a8d628ade64523bd761a61075a Parents: d9c95f0 Author: Raghu Angadi <rang...@google.com> Authored: Wed Jul 26 14:29:11 2017 -0700 Committer: Raghu Angadi <rang...@google.com> Committed: Tue Oct 17 00:02:04 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 30 +++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/99f55419/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 9f5f493..654a137 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -76,6 +76,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptions; @@ -1905,6 +1906,10 @@ public class KafkaIO { private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer"; private static final String WRITER_ID = "writerId"; + private static final String METRIC_NAMESPACE = "KafkaEOSink"; + + // Not sure of a good limit. This applies only for large bundles. + private static final int MAX_RECORDS_PER_TXN = 1000; private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); @StateId(NEXT_ID) @@ -1928,9 +1933,11 @@ public class KafkaIO { new HashMap<>(); // TODO: Need a way to close producers that are no longer relevant (may be have a timeout?). - // This would mainly matter only in batch pipelines. Could be configurable. - private static final int MAX_RECORDS_PER_TXN = 1000; - + // Metrics + private final Counter elementsWritten = SinkMetrics.elementsWritten(); + // Elements buffered due to out of order arrivals. + private final Counter elementsBuffered = Metrics.counter(METRIC_NAMESPACE, "elementsBuffered"); + private final Counter numTransactions = Metrics.counter(METRIC_NAMESPACE, "numTransactions"); KafkaEOWriter(Write<K, V> spec) { this.spec = spec; @@ -1995,23 +2002,24 @@ public class KafkaIO { oooBufferState.add(kv); minBufferedId = Math.min(minBufferedId, recordId); minBufferedIdState.write(minBufferedId); + elementsBuffered.inc(); continue; } // recordId and nextId match. Finally write record. - writer.sendRecord(kv.getValue()); + writer.sendRecord(kv.getValue(), elementsWritten); nextId++; if (++txnSize >= MAX_RECORDS_PER_TXN) { - writer.commitTxn(recordId); + writer.commitTxn(recordId, numTransactions); txnSize = 0; writer.beginTxn(); } if (minBufferedId == nextId) { // One or more of the buffered records can be committed now. - // Read all the buffered records in to memory and sort them. Reading into memory + // Read all of them in to memory and sort them. Reading into memory // might be problematic in extreme cases. Might need to improve it in future. List<KV<Long, KV<K, V>>> buffered = Lists.newArrayList(oooBufferState.read()); @@ -2029,7 +2037,7 @@ public class KafkaIO { } } - writer.commitTxn(nextId - 1); + writer.commitTxn(nextId - 1, numTransactions); nextIdState.write(nextId); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // JavaDoc says these are not recoverable errors and producer should be closed. @@ -2055,7 +2063,7 @@ public class KafkaIO { @JsonProperty("id") public final String writerId; - private ShardMetadata() { // for json + private ShardMetadata() { // for json deserializer sequenceId = -1; writerId = null; } @@ -2092,17 +2100,18 @@ public class KafkaIO { producer.beginTransaction(); } - void sendRecord(KV<K, V> record) { + void sendRecord(KV<K, V> record, Counter sendCounter) { try { producer.send( new ProducerRecord<>(spec.getTopic(), record.getKey(), record.getValue())); + sendCounter.inc(); } catch (KafkaException e) { producer.abortTransaction(); throw e; } } - void commitTxn(long lastRecordId) throws IOException { + void commitTxn(long lastRecordId, Counter numTransactions) throws IOException { try { // Store id in consumer group metadata for the partition. // NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits @@ -2116,6 +2125,7 @@ public class KafkaIO { spec.getSinkGroupId()); producer.commitTransaction(); + numTransactions.inc(); LOG.info("{} : committed {} records", shard, lastRecordId - committedId); committedId = lastRecordId;