try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {

    try (DataOutputStream dos = new DataOutputStream(bos)) {

    // First element is the timestamp

     dos.writeLong(System.currentTimeMillis());

    // Second element is the class name, this element is used for
deserializing the event at the consumer side

    dos.writeUTF(this.appLevelTopicId);

    //Third Element is subTopic

    dos.writeUTF(getSubTopic() ==null? "null" : getSubTopic());

    // Fourth element is the class name, this element is used for
deserializing the event at the consumer side

    dos.writeUTF(event.getClass().getName());

    }

    try (OutputStreamWriter byteWriter = new OutputStreamWriter(bos,
com.force.commons.text.EncodingUtil.UTF_ENCODING)) {

    Optional<ScrtContext> scrtContext = scrtContextProvider.getOptional();

      if (scrtContext.isPresent()) {

          serializationService.toJson(scrtContext.get(), byteWriter);

      } else {

          byteWriter.write("null");

      }

      // Sixth element is the actual event

      serializationService.toJson(event, byteWriter);

      // Seventh element is the request context

      if (RequestContext.get().isEstablished()) {

          serializationService.toJson(RequestContext.serialize(), byteWriter
);

      }

}

    byte[] payload = bos.toByteArray();

    ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte
[]>(kafkaTopicAttributes.getTopicName(),partitionNumber, null, payload);

     //Send is async by default and when the messages are published to the
Kafka broker, Callback is executed with the status of the delivery

    kafkaProducer.send(data);



On Wed, Mar 2, 2016 at 4:59 AM, Asaf Mesika <asaf.mes...@gmail.com> wrote:

> Can you show your code for sending?
>
> On Tue, 1 Mar 2016 at 21:59 Fang Wong <fw...@salesforce.com> wrote:
>
> > [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, [2016-02-26
> > 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /
> 10.224.146.62
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /
> 10.224.146.63
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /
> 10.224.146.61
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /
> 10.224.146.60
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /
> 10.224.146.59
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> > (kafka.network.Processor)of
> > 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> >
>

Reply via email to