try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { try (DataOutputStream dos = new DataOutputStream(bos)) {
// First element is the timestamp dos.writeLong(System.currentTimeMillis()); // Second element is the class name, this element is used for deserializing the event at the consumer side dos.writeUTF(this.appLevelTopicId); //Third Element is subTopic dos.writeUTF(getSubTopic() ==null? "null" : getSubTopic()); // Fourth element is the class name, this element is used for deserializing the event at the consumer side dos.writeUTF(event.getClass().getName()); } try (OutputStreamWriter byteWriter = new OutputStreamWriter(bos, com.force.commons.text.EncodingUtil.UTF_ENCODING)) { Optional<ScrtContext> scrtContext = scrtContextProvider.getOptional(); if (scrtContext.isPresent()) { serializationService.toJson(scrtContext.get(), byteWriter); } else { byteWriter.write("null"); } // Sixth element is the actual event serializationService.toJson(event, byteWriter); // Seventh element is the request context if (RequestContext.get().isEstablished()) { serializationService.toJson(RequestContext.serialize(), byteWriter ); } } byte[] payload = bos.toByteArray(); ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte []>(kafkaTopicAttributes.getTopicName(),partitionNumber, null, payload); //Send is async by default and when the messages are published to the Kafka broker, Callback is executed with the status of the delivery kafkaProducer.send(data); On Wed, Mar 2, 2016 at 4:59 AM, Asaf Mesika <asaf.mes...@gmail.com> wrote: > Can you show your code for sending? > > On Tue, 1 Mar 2016 at 21:59 Fang Wong <fw...@salesforce.com> wrote: > > > [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to > > invalid request: Request of length 1937006964 is not valid, [2016-02-26 > > 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to > > invalid request: Request of length 1937006964 is not valid, it is larger > > than the maximum size of 104857600 bytes. (kafka.network.Processor) > > [2016-02-26 20:33:43,025] INFO Closing socket connection to / > 10.224.146.62 > > due to invalid request: Request of length 1937006964 is not valid, it is > > larger than the maximum size of 104857600 bytes. > (kafka.network.Processor) > > [2016-02-26 20:33:43,047] INFO Closing socket connection to / > 10.224.146.63 > > due to invalid request: Request of length 1937006964 is not valid, it is > > larger than the maximum size of 104857600 bytes. > (kafka.network.Processor) > > [2016-02-26 20:33:43,049] INFO Closing socket connection to / > 10.224.146.61 > > due to invalid request: Request of length 1937006964 is not valid, it is > > larger than the maximum size of 104857600 bytes. > (kafka.network.Processor) > > [2016-02-26 20:33:43,055] INFO Closing socket connection to / > 10.224.146.60 > > due to invalid request: Request of length 1937006964 is not valid, it is > > larger than the maximum size of 104857600 bytes. > (kafka.network.Processor) > > [2016-02-26 20:33:43,112] INFO Closing socket connection to / > 10.224.146.59 > > due to invalid request: Request of length 1937006964 is not valid, it is > > larger than the maximum size of 104857600 bytes. > > (kafka.network.Processor)of > > 104857600 bytes. (kafka.network.Processor) > > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to > > invalid request: Request of length 1937006964 is not valid, it is larger > > than the maximum size of 104857600 bytes. (kafka.network.Processor) > > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to > > invalid request: Request of length 1937006964 is not valid, it is larger > > than the maximum size of 104857600 bytes. (kafka.network.Processor) > > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to > > invalid request: Request of length 1937006964 is not valid, it is larger > > than the maximum size of 104857600 bytes. (kafka.network.Processor) > > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to > > invalid request: Request of length 1937006964 is not valid, it is larger > > than the maximum size of 104857600 bytes. (kafka.network.Processor) > > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to > > invalid request: Request of length 1937006964 is not valid, it is larger > > than the maximum size of 104857600 bytes. (kafka.network.Processor) > > >