[ 
https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501832#comment-16501832
 ] 

ASF GitHub Bot commented on METRON-1594:
----------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1045#discussion_r193081934
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 ---
    @@ -156,33 +172,61 @@ public void configure(String sensorName, 
WriterConfiguration configuration) {
         }
       }
     
    +  @Override
    +  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config)
    +      throws Exception {
    +    if(this.zkQuorum != null && this.brokerUrl == null) {
    +      try {
    +        this.brokerUrl = 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum));
    +      } catch (Exception e) {
    +        throw new IllegalStateException("Cannot read kafka brokers from 
zookeeper and you didn't specify them, giving up!", e);
    +      }
    +    }
    +    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
    +  }
    +
       public Map<String, Object> createProducerConfigs() {
         Map<String, Object> producerConfig = new HashMap<>();
         producerConfig.put("bootstrap.servers", brokerUrl);
         producerConfig.put("key.serializer", keySerializer);
         producerConfig.put("value.serializer", valueSerializer);
         producerConfig.put("request.required.acks", requiredAcks);
    +    producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 
DEFAULT_BATCH_SIZE);
         producerConfig.putAll(producerConfigs == null?new 
HashMap<>():producerConfigs);
         producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
         return producerConfig;
       }
     
       @Override
    -  public void init() {
    -    if(this.zkQuorum != null && this.brokerUrl == null) {
    +  public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations,
    +      Iterable<Tuple> tuples, List<JSONObject> messages) {
    +    BulkWriterResponse writerResponse = new BulkWriterResponse();
    +
    +    List<Map.Entry<Tuple, Future>> results = new ArrayList<>();
    +    int i = 0;
    +    for (Tuple tuple : tuples) {
    +      JSONObject message = messages.get(i++);
    +      Future future = kafkaProducer
    +          .send(new ProducerRecord<String, String>(kafkaTopic, 
message.toJSONString()));
    --- End diff --
    
    
    > I've made a conscious decision in this PR to not make any changes to how 
we go about managing serialization and deserialization. 
    
    I am not suggesting that we change how we do serialization.  I think we 
need to wrap the `message.toJSONString()` in a try/catch, so that an exception 
during serialization is handled as an error and added to the 
`BulkWriterResponse`, just like the other errors that we handle on lines 218 
and 226.  
    
    Right now, an error on one tuple will kill the whole batch.  I would think 
we would want to handle all errors in the same way.  


> KafkaWriter is asynchronous and may lose data on node failure
> -------------------------------------------------------------
>
>                 Key: METRON-1594
>                 URL: https://issues.apache.org/jira/browse/METRON-1594
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Michael Miklavcic
>            Assignee: Michael Miklavcic
>            Priority: Major
>
> Currently, we do not block for data to be sent from kafka writer and we do 
> not batch. Unfortunately, the send call is asynchronous, so if the node dies 
> before the data is actually sent from kafka then it will drop data. It is 
> highly unlikely that we will be able to make kafkawriter synchronous in a 
> performant way, so we will likely need to batch in the parser and enrichment 
> topologies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to