It hasn’t been discussed. Feel free to create a Jira issue and a pull request. 
Please make sure you include a test for your change. 

Ralph

> On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <fedex...@gmail.com> wrote:
> 
> Hello everyone,
> 
> I wanted to discuss with you if it's possible or if you would consider
> useful adding the possibility to send the LogEvent time as a timestamp for
> the record when using the log4j KafkaAppender. I think it could be very
> useful for everyone using Kafka as a log aggregator having the possibility
> to use the event time, rather than the time the record is being sent.
> Bear with me, I've just started looking at the souce code of KafkaAppender
> and may overlook something in the broader scope of log4j.
> 
> As far as I've seen in the source code, the message is sent by KafkaManager:
> 
> 146    private void tryAppend(final LogEvent event) throws
> ExecutionException, InterruptedException, TimeoutException {147
> final Layout<? extends Serializable> layout = getLayout();148
> byte[] data;149        if (layout instanceof SerializedLayout) {150
>        final byte[] header = layout.getHeader();151            final
> byte[] body = layout.toByteArray(event);152            data = new
> byte[header.length + body.length];153
> System.arraycopy(header, 0, data, 0, header.length);154
> System.arraycopy(body, 0, data, header.length, body.length);155
> } else {156            data = layout.toByteArray(event);157
> }*158        manager.send(data);*159    }
> 
> with manager.send() implemented this way, with highlighted the creation of
> the ProducerRecord:
> 
> 108    public void send(final byte[] msg) throws ExecutionException,
> InterruptedException, TimeoutException {109        if (producer !=
> null) {110            byte[] newKey = null;111112            if(key !=
> null && key.contains("${")) {113                newKey =
> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
>           } else if (key != null) {115                newKey =
> key.getBytes(StandardCharsets.UTF_8);116            }117*118
> final ProducerRecord<byte[], byte[]> newRecord = new
> ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
> {120                final Future<RecordMetadata> response =
> producer.send(newRecord);121
> response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
> else {123                producer.send(newRecord, new Callback() {124
>                  @Override125                    public void
> onCompletion(final RecordMetadata metadata, final Exception e) {126
>                    if (e != null) {127
> LOGGER.error("Unable to write to Kafka in appender [" + getName() +
> "]", e);128                        }129                    }130
>        });131            }132        }133    }
> 
> 
> Now, ProducerRecord has the additional parameters, in particular, I'm
> looking at:
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->
> 
> public ProducerRecord(java.lang.String topic,
>                      java.lang.Integer partition,
>                      java.lang.Long timestamp,
>                      K
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
> key,
>                      V
> <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
> value)
> 
> which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
> but would force us to also input the partition where the record should be
> sent. Still, the logic behind the partitioning within the KafkaProducer is
> so that if partition is null, then the defined partitioner will be used
> (DefaultPartitioner or the one defined by the 'partitioner.class'
> property), so, we could simply assign it as null.
> 
> In terms of interface, we could add a single flag in the KafkaAppender
> definition, something like:
> 
> <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> 
> If the 'timestamp' flag is false, then the record would be sent with the
> timestamp parameter of the ProducerRecord as null, leaving the behaviour as
> it is right now.
> 
> What do you think about that? Was this something which was already
> discussed?
> 
> Thank you for your attention,
> Federico


Reply via email to