METRON-1782 Add Kafka Partition and Offset to Profiler Debug Logs (nickwallen) closes apache/metron#1202
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9c9e2954 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9c9e2954 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9c9e2954 Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 9c9e29547837e648723920329c3b7fea6211f0db Parents: 1723a0e Author: nickwallen <n...@nickallen.org> Authored: Mon Oct 1 09:07:28 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Mon Oct 1 09:07:28 2018 -0400 ---------------------------------------------------------------------- .../src/main/flux/profiler/remote.yaml | 11 +++++++++-- .../metron/profiler/storm/ProfileSplitterBolt.java | 13 ++++++++++++- .../metron/profiler/storm/ProfileSplitterBoltTest.java | 3 ++- .../storm/kafka/flux/SimpleStormKafkaBuilder.java | 4 +++- 4 files changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml index da71b27..e16a782 100644 --- a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml +++ b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml @@ -77,8 +77,15 @@ components: className: "java.util.ArrayList" configMethods: - name: "add" - args: - - "value" + args: ["value"] + - name: "add" + args: ["topic"] + - name: "add" + args: ["partition"] + - name: "add" + args: ["offset"] + - name: "add" + args: ["timestamp"] - id: "kafkaConfig" className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java index 81179b6..ef58ad9 100644 --- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java @@ -43,6 +43,12 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; +import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.OFFSET; +import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.PARTITION; +import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.TIMESTAMP; +import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.TOPIC; +import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE; + /** * The Storm bolt responsible for filtering incoming messages and directing * each to the downstream bolts responsible for building a Profile. @@ -132,6 +138,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { @Override public void execute(Tuple input) { try { + LOG.debug("Received message; topic={}, partition={}, offset={}, kafkaTimestamp={}", + input.contains(TOPIC.getFieldName()) ? input.getStringByField(TOPIC.getFieldName()): "unknown", + input.contains(PARTITION.getFieldName()) ? input.getIntegerByField(PARTITION.getFieldName()): "unknown", + input.contains(OFFSET.getFieldName()) ? input.getLongByField(OFFSET.getFieldName()): "unknown", + input.contains(TIMESTAMP.getFieldName()) ? input.getLongByField(TIMESTAMP.getFieldName()): "unknown"); doExecute(input); } catch (Throwable t) { @@ -146,7 +157,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingException { // retrieve the input message - byte[] data = input.getBinary(0); + byte[] data = input.getBinaryByField(VALUE.getFieldName()); if(data == null) { LOG.debug("Received null message. Nothing to do."); return; http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java index 93d2ac4..360ef4b 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java @@ -39,6 +39,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.HashMap; +import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; @@ -219,7 +220,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { message = (JSONObject) parser.parse(input); // ensure the tuple returns the expected json message - when(tuple.getBinary(0)).thenReturn(input.getBytes()); + when(tuple.getBinaryByField(VALUE.getFieldName())).thenReturn(input.getBytes()); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java index f99e549..0aebff8 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -57,7 +57,9 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V KEY("key", record -> record.key()), VALUE("value", record -> record.value()), PARTITION("partition", record -> record.partition()), - TOPIC("topic", record -> record.topic()) + OFFSET("offset", record -> record.offset()), + TOPIC("topic", record -> record.topic()), + TIMESTAMP("timestamp", record -> record.timestamp()) ; String fieldName; Function<ConsumerRecord,Object> recordExtractor;