Add basic metrics.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99f55419
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99f55419
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99f55419

Branch: refs/heads/master
Commit: 99f554197ada52a8d628ade64523bd761a61075a
Parents: d9c95f0
Author: Raghu Angadi <rang...@google.com>
Authored: Wed Jul 26 14:29:11 2017 -0700
Committer: Raghu Angadi <rang...@google.com>
Committed: Tue Oct 17 00:02:04 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 30 +++++++++++++-------
 1 file changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/99f55419/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 9f5f493..654a137 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -1905,6 +1906,10 @@ public class KafkaIO {
     private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer";
     private static final String WRITER_ID = "writerId";
 
+    private static final String METRIC_NAMESPACE = "KafkaEOSink";
+
+    // Not sure of a good limit. This applies only for large bundles.
+    private static final int MAX_RECORDS_PER_TXN = 1000;
     private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
 
     @StateId(NEXT_ID)
@@ -1928,9 +1933,11 @@ public class KafkaIO {
         new HashMap<>();
     // TODO: Need a way to close producers that are no longer relevant (may be 
have a timeout?).
 
-    // This would mainly matter only in batch pipelines. Could be configurable.
-    private static final int MAX_RECORDS_PER_TXN = 1000;
-
+    // Metrics
+    private final Counter elementsWritten = SinkMetrics.elementsWritten();
+    // Elements buffered due to out of order arrivals.
+    private final Counter elementsBuffered = Metrics.counter(METRIC_NAMESPACE, 
"elementsBuffered");
+    private final Counter numTransactions = Metrics.counter(METRIC_NAMESPACE, 
"numTransactions");
 
     KafkaEOWriter(Write<K, V> spec) {
       this.spec = spec;
@@ -1995,23 +2002,24 @@ public class KafkaIO {
             oooBufferState.add(kv);
             minBufferedId = Math.min(minBufferedId, recordId);
             minBufferedIdState.write(minBufferedId);
+            elementsBuffered.inc();
             continue;
           }
 
           // recordId and nextId match. Finally write record.
 
-          writer.sendRecord(kv.getValue());
+          writer.sendRecord(kv.getValue(), elementsWritten);
           nextId++;
 
           if (++txnSize >= MAX_RECORDS_PER_TXN) {
-            writer.commitTxn(recordId);
+            writer.commitTxn(recordId, numTransactions);
             txnSize = 0;
             writer.beginTxn();
           }
 
           if (minBufferedId == nextId) {
             // One or more of the buffered records can be committed now.
-            // Read all the buffered records in to memory and sort them. 
Reading into memory
+            // Read all of them in to memory and sort them. Reading into memory
             // might be problematic in extreme cases. Might need to improve it 
in future.
 
             List<KV<Long, KV<K, V>>> buffered = 
Lists.newArrayList(oooBufferState.read());
@@ -2029,7 +2037,7 @@ public class KafkaIO {
           }
         }
 
-        writer.commitTxn(nextId - 1);
+        writer.commitTxn(nextId - 1, numTransactions);
         nextIdState.write(nextId);
       } catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e) {
         // JavaDoc says these are not recoverable errors and producer should 
be closed.
@@ -2055,7 +2063,7 @@ public class KafkaIO {
       @JsonProperty("id")
       public final String writerId;
 
-      private ShardMetadata() { // for json
+      private ShardMetadata() { // for json deserializer
         sequenceId = -1;
         writerId = null;
       }
@@ -2092,17 +2100,18 @@ public class KafkaIO {
         producer.beginTransaction();
       }
 
-      void sendRecord(KV<K, V> record) {
+      void sendRecord(KV<K, V> record, Counter sendCounter) {
         try {
           producer.send(
               new ProducerRecord<>(spec.getTopic(), record.getKey(), 
record.getValue()));
+          sendCounter.inc();
         } catch (KafkaException e) {
           producer.abortTransaction();
           throw e;
         }
       }
 
-      void commitTxn(long lastRecordId) throws IOException {
+      void commitTxn(long lastRecordId, Counter numTransactions) throws 
IOException {
         try {
           // Store id in consumer group metadata for the partition.
           // NOTE: Kafka keeps this metadata for 24 hours since the last 
update. This limits
@@ -2116,6 +2125,7 @@ public class KafkaIO {
               spec.getSinkGroupId());
           producer.commitTransaction();
 
+          numTransactions.inc();
           LOG.info("{} : committed {} records", shard, lastRecordId - 
committedId);
 
           committedId = lastRecordId;

Reply via email to