[ https://issues.apache.org/jira/browse/METRON-1594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501832#comment-16501832 ]
ASF GitHub Bot commented on METRON-1594: ---------------------------------------- Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1045#discussion_r193081934 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java --- @@ -156,33 +172,61 @@ public void configure(String sensorName, WriterConfiguration configuration) { } } + @Override + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) + throws Exception { + if(this.zkQuorum != null && this.brokerUrl == null) { + try { + this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum)); + } catch (Exception e) { + throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you didn't specify them, giving up!", e); + } + } + this.kafkaProducer = new KafkaProducer<>(createProducerConfigs()); + } + public Map<String, Object> createProducerConfigs() { Map<String, Object> producerConfig = new HashMap<>(); producerConfig.put("bootstrap.servers", brokerUrl); producerConfig.put("key.serializer", keySerializer); producerConfig.put("value.serializer", valueSerializer); producerConfig.put("request.required.acks", requiredAcks); + producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, DEFAULT_BATCH_SIZE); producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs); producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig); return producerConfig; } @Override - public void init() { - if(this.zkQuorum != null && this.brokerUrl == null) { + public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, + Iterable<Tuple> tuples, List<JSONObject> messages) { + BulkWriterResponse writerResponse = new BulkWriterResponse(); + + List<Map.Entry<Tuple, Future>> results = new ArrayList<>(); + int i = 0; + for (Tuple tuple : tuples) { + JSONObject message = messages.get(i++); + Future future = kafkaProducer + .send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString())); --- End diff -- > I've made a conscious decision in this PR to not make any changes to how we go about managing serialization and deserialization. I am not suggesting that we change how we do serialization. I think we need to wrap the `message.toJSONString()` in a try/catch, so that an exception during serialization is handled as an error and added to the `BulkWriterResponse`, just like the other errors that we handle on lines 218 and 226. Right now, an error on one tuple will kill the whole batch. I would think we would want to handle all errors in the same way. > KafkaWriter is asynchronous and may lose data on node failure > ------------------------------------------------------------- > > Key: METRON-1594 > URL: https://issues.apache.org/jira/browse/METRON-1594 > Project: Metron > Issue Type: Bug > Reporter: Michael Miklavcic > Assignee: Michael Miklavcic > Priority: Major > > Currently, we do not block for data to be sent from kafka writer and we do > not batch. Unfortunately, the send call is asynchronous, so if the node dies > before the data is actually sent from kafka then it will drop data. It is > highly unlikely that we will be able to make kafkawriter synchronous in a > performant way, so we will likely need to batch in the parser and enrichment > topologies. -- This message was sent by Atlassian JIRA (v7.6.3#76005)