[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504792#comment-16504792
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user ottobackwards commented on the issue:

https://github.com/apache/metron/pull/1045
  
I assume you are talking to @nickwallen there @mmiklavc ?


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504758#comment-16504758
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user mmiklavc commented on the issue:

https://github.com/apache/metron/pull/1045
  
I didn't know you were still actively reviewing. The last comment I 
received besides a +1 was "that looks great, thanks." We do have unit and 
integration tests around all of this in addition to the batch performance 
testing performed on enrichments. They use the same KafkaWriter.

Below is verbatim from our bylaws. This is also the handling that I've 
personally seen on most PRs that don't have dissenting/outstanding concerns to 
be addressed.

> Code Change
> 
> A change made to a codebase of the project requires lazy consensus of 
active committers other than the author of the patch. The code can be committed 
after the first +1.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504724#comment-16504724
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user ottobackwards commented on the issue:

https://github.com/apache/metron/pull/1045
  
My comment was just about calling out a possible need for more shutdown 
orchestration.
I am not reviewing.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504718#comment-16504718
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user nickwallen commented on the issue:

https://github.com/apache/metron/pull/1045
  
I was still going through it.  Not sure where @ottobackwards landed on 
this.  

Besides the open TODO comment, I am not sure how much testing we did around 
the Profiler or testing changes made to the new configuration elements that you 
added.  I didn't see a test plan around much of this besides spinning up Full 
Dev, which doesn't exercise the Profiler.

I guess its just a community courtesy that we've followed, no matter how 
long its open.  Kind of like how I followed up on #1036.  I don't want to make 
a stink of it.  Like I said, its just a courtesy I suppose.  


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504697#comment-16504697
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user mmiklavc commented on the issue:

https://github.com/apache/metron/pull/1045
  
@nickwallen The PR has been up for 7 days, I addressed all community 
comments days ago, and no comments appeared to be dissenting. Was there a 
concern or issue you had before this was merged?


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504658#comment-16504658
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user nickwallen commented on the issue:

https://github.com/apache/metron/pull/1045
  
@mmiklavc FYI You've got a TODO comment in `ConfiguredIndexingBolt.java` 
that seems like something you wanted to address before merging.

Usually good to at least give a chance for all those who have chimed in on 
a PR to sign-off before merging it in.  At least that's what I try to do as a 
courtesy. 




> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503948#comment-16503948
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user asfgit closed the pull request at:

https://github.com/apache/metron/pull/1045


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503844#comment-16503844
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user cestella commented on the issue:

https://github.com/apache/metron/pull/1045
  
+1 by inspection.  I ran this up in full-dev and data flowed through just 
fine.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503770#comment-16503770
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user mmiklavc commented on the issue:

https://github.com/apache/metron/pull/1045
  
Went through a few variations of testing enrichments using the new 
KafkaWriter bulk message writer implementation with @anandsubbu and we are 
seeing results close to the numbers we see in master and prior versions. There 
is ever slight degradation (went from I believe 36k EPS in our setup for 1 
enrichment to 33k EPS), however we are no longer at risk of dropping data.

For posterity purposes, the new implementation can be summarized as follows:
- kafka producer has a number of settings for batching available via 
Kafka's client ProducerConfigs class. We are not manipulating these properties 
directly via the exposed config because we want to control batching in order to 
have the ability to have guarantees with Storm's acking mechanism.
- The 
[BulkWriterComponent](https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java)
 class manages batching in terms of number of records and timeout. It's 
important to note that the KafkaProducer batches in terms of size in bytes 
whereas the writer component manages batches in terms or number of records, 
regardless of size. Previously, KafkaWriter was processing messages 1-by-1, 
asynchronously, and leveraging the KafkaProducer defaults. KafkaWriter now is a 
proper BulkMessageWriter that can handle sending multiple messages to Kafka by 
processing the Futures returned by the `public 
java.util.concurrent.Future send(ProducerRecord record)` 
method. We call `flush()` to ensure all Futures have been processed before 
returning their status back to the calling class.
- We tweaked the default slightly under the hood for the Kafka Producer 
batch size - it was originally 16KB and our performance tests found the setting 
to be optimal around 64KB. This has been set as a default constant in the 
writer as we want users to leverage the batchSize and batchTimeout settings 
exposed via the standard Metron configuration.
- It is possible, though not advised, to override the KafkaProducer 
batch.size and batch.timeout settings by adding the property to the Kafka 
producer config map in Flux or via parser properties. This should only be done 
as a matter of optimizing the Kafka batching caches as the producer will always 
allocate 64KB regardless of being full or not. The settings exposed in Metron 
for batchSize and batchTimeout should be sufficient, and again, our performance 
tests suggest that the defaults should be sufficient. As usual, adjusting spout 
timeouts and spout and bolt parallelism will provide the necessary scaling 
knobs and levers.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502734#comment-16502734
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user mmiklavc commented on a diff in the pull request:

https://github.com/apache/metron/pull/1045#discussion_r193270320
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 ---
@@ -156,33 +172,61 @@ public void configure(String sensorName, 
WriterConfiguration configuration) {
 }
   }
 
+  @Override
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config)
+  throws Exception {
+if(this.zkQuorum != null && this.brokerUrl == null) {
+  try {
+this.brokerUrl = 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+  } catch (Exception e) {
+throw new IllegalStateException("Cannot read kafka brokers from 
zookeeper and you didn't specify them, giving up!", e);
+  }
+}
+this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+  }
+
   public Map createProducerConfigs() {
 Map producerConfig = new HashMap<>();
 producerConfig.put("bootstrap.servers", brokerUrl);
 producerConfig.put("key.serializer", keySerializer);
 producerConfig.put("value.serializer", valueSerializer);
 producerConfig.put("request.required.acks", requiredAcks);
+producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 
DEFAULT_BATCH_SIZE);
 producerConfig.putAll(producerConfigs == null?new 
HashMap<>():producerConfigs);
 producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
 return producerConfig;
   }
 
   @Override
-  public void init() {
-if(this.zkQuorum != null && this.brokerUrl == null) {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations,
+  Iterable tuples, List messages) {
+BulkWriterResponse writerResponse = new BulkWriterResponse();
+
+List> results = new ArrayList<>();
+int i = 0;
+for (Tuple tuple : tuples) {
+  JSONObject message = messages.get(i++);
+  Future future = kafkaProducer
+  .send(new ProducerRecord(kafkaTopic, 
message.toJSONString()));
--- End diff --

Hey @nickwallen - check out this recent commit and let me know what you 
think. I added a try/catch for a Throwable on serializing the JSON, add it as 
an error, and continue the loop. Does that look better? I'm hoping we don't 
have situations where that toJSON method will throw exceptions, but as you 
mentioned, if it does, now it won't kill the whole batch.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501832#comment-16501832
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/1045#discussion_r193081934
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 ---
@@ -156,33 +172,61 @@ public void configure(String sensorName, 
WriterConfiguration configuration) {
 }
   }
 
+  @Override
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config)
+  throws Exception {
+if(this.zkQuorum != null && this.brokerUrl == null) {
+  try {
+this.brokerUrl = 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+  } catch (Exception e) {
+throw new IllegalStateException("Cannot read kafka brokers from 
zookeeper and you didn't specify them, giving up!", e);
+  }
+}
+this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+  }
+
   public Map createProducerConfigs() {
 Map producerConfig = new HashMap<>();
 producerConfig.put("bootstrap.servers", brokerUrl);
 producerConfig.put("key.serializer", keySerializer);
 producerConfig.put("value.serializer", valueSerializer);
 producerConfig.put("request.required.acks", requiredAcks);
+producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 
DEFAULT_BATCH_SIZE);
 producerConfig.putAll(producerConfigs == null?new 
HashMap<>():producerConfigs);
 producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
 return producerConfig;
   }
 
   @Override
-  public void init() {
-if(this.zkQuorum != null && this.brokerUrl == null) {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations,
+  Iterable tuples, List messages) {
+BulkWriterResponse writerResponse = new BulkWriterResponse();
+
+List> results = new ArrayList<>();
+int i = 0;
+for (Tuple tuple : tuples) {
+  JSONObject message = messages.get(i++);
+  Future future = kafkaProducer
+  .send(new ProducerRecord(kafkaTopic, 
message.toJSONString()));
--- End diff --


> I've made a conscious decision in this PR to not make any changes to how 
we go about managing serialization and deserialization. 

I am not suggesting that we change how we do serialization.  I think we 
need to wrap the `message.toJSONString()` in a try/catch, so that an exception 
during serialization is handled as an error and added to the 
`BulkWriterResponse`, just like the other errors that we handle on lines 218 
and 226.  

Right now, an error on one tuple will kill the whole batch.  I would think 
we would want to handle all errors in the same way.  


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500805#comment-16500805
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user mmiklavc commented on a diff in the pull request:

https://github.com/apache/metron/pull/1045#discussion_r192868140
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 ---
@@ -156,33 +172,61 @@ public void configure(String sensorName, 
WriterConfiguration configuration) {
 }
   }
 
+  @Override
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config)
+  throws Exception {
+if(this.zkQuorum != null && this.brokerUrl == null) {
+  try {
+this.brokerUrl = 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+  } catch (Exception e) {
+throw new IllegalStateException("Cannot read kafka brokers from 
zookeeper and you didn't specify them, giving up!", e);
+  }
+}
+this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+  }
+
   public Map createProducerConfigs() {
 Map producerConfig = new HashMap<>();
 producerConfig.put("bootstrap.servers", brokerUrl);
 producerConfig.put("key.serializer", keySerializer);
 producerConfig.put("value.serializer", valueSerializer);
 producerConfig.put("request.required.acks", requiredAcks);
+producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 
DEFAULT_BATCH_SIZE);
 producerConfig.putAll(producerConfigs == null?new 
HashMap<>():producerConfigs);
 producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
 return producerConfig;
   }
 
   @Override
-  public void init() {
-if(this.zkQuorum != null && this.brokerUrl == null) {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations,
+  Iterable tuples, List messages) {
+BulkWriterResponse writerResponse = new BulkWriterResponse();
+
+List> results = new ArrayList<>();
+int i = 0;
+for (Tuple tuple : tuples) {
+  JSONObject message = messages.get(i++);
+  Future future = kafkaProducer
+  .send(new ProducerRecord(kafkaTopic, 
message.toJSONString()));
--- End diff --

It's effectively a HashMap, since that's what the JSONObject class extends. 
I think the writer should basically write out whatever has been passed to it, 
meaning that the responsibility would be on the invoking class to sort out 
whether the HashMap has been constructed correctly in order to properly 
serialize into a JSON string. I've made a conscious decision in this PR to not 
make any changes to how we go about managing  serialization and 
deserialization. Are you worried about security threats, or just garbage data 
coming from parsers and enrichments, potentially?


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500486#comment-16500486
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user mmiklavc commented on the issue:

https://github.com/apache/metron/pull/1045
  
@ottobackwards I believe this should already be handled by the acking 
strategy. Anything not acked will be replayed since we're leveraging at least 
once message processing semantics.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499067#comment-16499067
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user ottobackwards commented on the issue:

https://github.com/apache/metron/pull/1045
  
Maybe we need some kind of orchestration service that you use to shutdown 
metron without losing things in the pipeline already


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498693#comment-16498693
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user nickwallen commented on the issue:

https://github.com/apache/metron/pull/1045
  
> This might actually explain why some of the topologies just won't die 
sometimes.

The more I think about it, this is very likely what is happening in the 
current code base when topologies just won't die. The queue builds a giant 
backlog, then it will block forever trying to clear the queue.  

But with these code changes, we should not carry any messages in the 
internal Kafka producer queue between calls to `KafkaWriter.write`.  So this 
*should* no longer be a problem; me thinks.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498611#comment-16498611
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user nickwallen commented on the issue:

https://github.com/apache/metron/pull/1045
  
I noticed that we should probably use a timeout when we call 
`KafkaProducer.close`.  If we get a huge backlog of messages, it will just 
block until the backlog is cleared.  We can get in situations where we build-up 
such a large backlog that there just is no hope of clearing it. This might 
actually explain why some of the topologies just won't die sometimes.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498590#comment-16498590
 ] 

ASF GitHub Bot commented on METRON-1594:


Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/1045#discussion_r192517781
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 ---
@@ -156,33 +172,61 @@ public void configure(String sensorName, 
WriterConfiguration configuration) {
 }
   }
 
+  @Override
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config)
+  throws Exception {
+if(this.zkQuorum != null && this.brokerUrl == null) {
+  try {
+this.brokerUrl = 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
+  } catch (Exception e) {
+throw new IllegalStateException("Cannot read kafka brokers from 
zookeeper and you didn't specify them, giving up!", e);
+  }
+}
+this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
+  }
+
   public Map createProducerConfigs() {
 Map producerConfig = new HashMap<>();
 producerConfig.put("bootstrap.servers", brokerUrl);
 producerConfig.put("key.serializer", keySerializer);
 producerConfig.put("value.serializer", valueSerializer);
 producerConfig.put("request.required.acks", requiredAcks);
+producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 
DEFAULT_BATCH_SIZE);
 producerConfig.putAll(producerConfigs == null?new 
HashMap<>():producerConfigs);
 producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
 return producerConfig;
   }
 
   @Override
-  public void init() {
-if(this.zkQuorum != null && this.brokerUrl == null) {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations,
+  Iterable tuples, List messages) {
+BulkWriterResponse writerResponse = new BulkWriterResponse();
+
+List> results = new ArrayList<>();
+int i = 0;
+for (Tuple tuple : tuples) {
+  JSONObject message = messages.get(i++);
+  Future future = kafkaProducer
+  .send(new ProducerRecord(kafkaTopic, 
message.toJSONString()));
--- End diff --

Should we be more defensive when we transform the message to JSON? 
Considering the broad use of this class, someone might inject something that 
causes problems during serialization.


> KafkaWriter is asynchronous and may lose data on node failure
> -
>
> Key: METRON-1594
> URL: https://issues.apache.org/jira/browse/METRON-1594
> Project: Metron
>  Issue Type: Bug
>Reporter: Michael Miklavcic
>Assignee: Michael Miklavcic
>Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (METRON-1594) KafkaWriter is asynchronous and may lose data on node failure

2018-05-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497166#comment-16497166
 ] 

ASF GitHub Bot commented on METRON-1594:


GitHub user mmiklavc opened a pull request:

https://github.com/apache/metron/pull/1045

METRON-1594: KafkaWriter is asynchronous and may lose data on node failure

## Contributor Comments

https://issues.apache.org/jira/browse/METRON-1594

This covers the work to convert the KafkaWriter from a basic MessageWriter 
to a BulkMessageWriter in order to address making producer.send() synchronous. 
This impacts: parsers, enrichment, indexing (error topic output), and profiler. 
Anything previously using the KafkaWriter as a single-record MessageWriter has 
been converted over to using it as a BulkMessageWriter.

Other:

- ParserBolt was lacking Tick Tuples so I've ported over the functionality 
almost verbatim from the BulkMessageWriterBolt.
- BulkMessageWriterBolt has been generalized. Previously, it was extending 
ConfiguredIndexingBolt in both enrichments and indexing.
- Configuring batchSize and batchTimeout for the BulkWriterComponent that 
wraps BulkMessageWriter(s):
  - parsers stays the same, default updated to 15 (unless/until perf 
testing suggests a different default more suitable)
  - enrichment - pulls from global config: `enrichment.writer.batchSize` 
and `enrichment.writer.batchTimeout`
  - indexing - stays the same except for the error output to kafka topic, 
which has been set to batch size 15 by default
  - profiler - Added a ProfilerWriterConfiguration to match the pattern 
used by other writers. Pulls from global config: `profiler.writer.batchSize` 
and `profiler.writer.batchTimeout`

Tested in full dev and data flows into the ES indexes. Currently undergoing 
performance testing to establish a proper baseline batch size that does not 
result in performance regressions.

## Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.  
Please refer to our [Development 
Guidelines](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61332235)
 for the complete guide to follow for contributions.  
Please refer also to our [Build Verification 
Guidelines](https://cwiki.apache.org/confluence/display/METRON/Verifying+Builds?show-miniview)
 for complete smoke testing guides.  


In order to streamline the review of the contribution we ask you follow 
these guidelines and ask you to double check the following:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? If not one needs to 
be created at [Metron 
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
- [x] Does your PR title start with METRON- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?


### For code changes:
- [ ] Have you included steps to reproduce the behavior or problem that is 
being changed or addressed?
- [ ] Have you included steps or a guide to how the change may be verified 
and tested manually?
- [ ] Have you ensured that the full suite of tests and checks have been 
executed in the root metron folder via:
  ```
  mvn -q clean integration-test install && 
dev-utilities/build-utils/verify_licenses.sh 
  ```

- [x] Have you written or updated unit tests and or integration tests to 
verify your changes?
- [x] Have you verified the basic functionality of the build by building 
and running locally with Vagrant full-dev environment or the equivalent?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered by building and verifying the site-book? If not then run 
the following commands and the verify changes via 
`site-book/target/site/index.html`:

  ```
  cd site-book
  mvn site
  ```

 Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.
It is also recommended that [travis-ci](https://travis-ci.org) is set up 
for your personal repository such that your branches are built there before 
submitting a pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mmiklavc/metron kafka-writer-synchro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/metron/pull/1045.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This