[ 
https://issues.apache.org/jira/browse/EAGLE-670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601333#comment-15601333
 ] 

ASF GitHub Bot commented on EAGLE-670:
--------------------------------------

Github user haoch commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/556#discussion_r84639212
  
    --- Diff: 
eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
 ---
    @@ -96,44 +103,63 @@ public void close() {
         }
     
         @SuppressWarnings( {"rawtypes", "unchecked"})
    -    protected PublishStatus emit(String topic, List<AlertStreamEvent> 
outputEvents) {
    +    protected void emit(String topic, List<AlertStreamEvent> outputEvents) 
{
             // we need to check producer here since the producer is invisable 
to extended kafka publisher
             if (producer == null) {
                 LOG.warn("KafkaProducer is null due to the incorrect 
configurations");
    -            return null;
    +            return;
             }
             if (outputEvents == null) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Alert stream events list in publishment is 
empty");
                 }
    -            return null;
    +            return;
             }
             this.status = new PublishStatus();
             try {
                 for (AlertStreamEvent outputEvent : outputEvents) {
                     ProducerRecord record = createRecord(outputEvent, topic);
                     if (record == null) {
                         LOG.error("Alert serialize return null, ignored 
message! ");
    -                    return null;
    +                    return;
    +                }
    +                if (mode == KafkaWriteMode.sync) {
    +                    Future<?> future = producer.send(record);
    +                    future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    +                    succeed(mode, "");
    +                } else {
    +                    producer.send(record, new Callback() {
    --- End diff --
    
    This is not `async` mode in kafka language


> AlertEngine: Make Kafka Publisher configurable to async, for throughput tuning
> ------------------------------------------------------------------------------
>
>                 Key: EAGLE-670
>                 URL: https://issues.apache.org/jira/browse/EAGLE-670
>             Project: Eagle
>          Issue Type: Task
>    Affects Versions: v0.5.0
>            Reporter: Su Ralph
>            Assignee: Garrett Li
>             Fix For: v0.5.0
>
>
> Kafka send alert in sync would limit the throughput. Make this configurable, 
> and use async by default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to