Also the key serializer is
 org.apache.kafka.common.serialization.StringSerializer and value
serializer = org.apache.kafka.common.serialization.ByteArraySerializer.

On Wed, Mar 2, 2016 at 10:24 AM, Fang Wong <fw...@salesforce.com> wrote:

> try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
>
>     try (DataOutputStream dos = new DataOutputStream(bos)) {
>
>     // First element is the timestamp
>
>      dos.writeLong(System.currentTimeMillis());
>
>     // Second element is the class name, this element is used for
> deserializing the event at the consumer side
>
>     dos.writeUTF(this.appLevelTopicId);
>
>     //Third Element is subTopic
>
>     dos.writeUTF(getSubTopic() ==null? "null" : getSubTopic());
>
>     // Fourth element is the class name, this element is used for
> deserializing the event at the consumer side
>
>     dos.writeUTF(event.getClass().getName());
>
>     }
>
>     try (OutputStreamWriter byteWriter = new OutputStreamWriter(bos,
> com.force.commons.text.EncodingUtil.UTF_ENCODING)) {
>
>     Optional<ScrtContext> scrtContext = scrtContextProvider.getOptional();
>
>       if (scrtContext.isPresent()) {
>
>           serializationService.toJson(scrtContext.get(), byteWriter);
>
>       } else {
>
>           byteWriter.write("null");
>
>       }
>
>       // Sixth element is the actual event
>
>       serializationService.toJson(event, byteWriter);
>
>       // Seventh element is the request context
>
>       if (RequestContext.get().isEstablished()) {
>
>           serializationService.toJson(RequestContext.serialize(),
> byteWriter);
>
>       }
>
> }
>
>     byte[] payload = bos.toByteArray();
>
>     ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte
> []>(kafkaTopicAttributes.getTopicName(),partitionNumber, null, payload);
>
>      //Send is async by default and when the messages are published to
> the Kafka broker, Callback is executed with the status of the delivery
>
>
>     kafkaProducer.send(data);
>
>
>
> On Wed, Mar 2, 2016 at 4:59 AM, Asaf Mesika <asaf.mes...@gmail.com> wrote:
>
>> Can you show your code for sending?
>>
>> On Tue, 1 Mar 2016 at 21:59 Fang Wong <fw...@salesforce.com> wrote:
>>
>> > [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, [2016-02-26
>> > 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /
>> 10.224.146.62
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /
>> 10.224.146.63
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /
>> 10.224.146.61
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /
>> 10.224.146.60
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /
>> 10.224.146.59
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> > (kafka.network.Processor)of
>> > 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> >
>>
>
>

Reply via email to