Also the key serializer is org.apache.kafka.common.serialization.StringSerializer and value serializer = org.apache.kafka.common.serialization.ByteArraySerializer.
On Wed, Mar 2, 2016 at 10:24 AM, Fang Wong <fw...@salesforce.com> wrote: > try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { > > try (DataOutputStream dos = new DataOutputStream(bos)) { > > // First element is the timestamp > > dos.writeLong(System.currentTimeMillis()); > > // Second element is the class name, this element is used for > deserializing the event at the consumer side > > dos.writeUTF(this.appLevelTopicId); > > //Third Element is subTopic > > dos.writeUTF(getSubTopic() ==null? "null" : getSubTopic()); > > // Fourth element is the class name, this element is used for > deserializing the event at the consumer side > > dos.writeUTF(event.getClass().getName()); > > } > > try (OutputStreamWriter byteWriter = new OutputStreamWriter(bos, > com.force.commons.text.EncodingUtil.UTF_ENCODING)) { > > Optional<ScrtContext> scrtContext = scrtContextProvider.getOptional(); > > if (scrtContext.isPresent()) { > > serializationService.toJson(scrtContext.get(), byteWriter); > > } else { > > byteWriter.write("null"); > > } > > // Sixth element is the actual event > > serializationService.toJson(event, byteWriter); > > // Seventh element is the request context > > if (RequestContext.get().isEstablished()) { > > serializationService.toJson(RequestContext.serialize(), > byteWriter); > > } > > } > > byte[] payload = bos.toByteArray(); > > ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte > []>(kafkaTopicAttributes.getTopicName(),partitionNumber, null, payload); > > //Send is async by default and when the messages are published to > the Kafka broker, Callback is executed with the status of the delivery > > > kafkaProducer.send(data); > > > > On Wed, Mar 2, 2016 at 4:59 AM, Asaf Mesika <asaf.mes...@gmail.com> wrote: > >> Can you show your code for sending? >> >> On Tue, 1 Mar 2016 at 21:59 Fang Wong <fw...@salesforce.com> wrote: >> >> > [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to >> > invalid request: Request of length 1937006964 is not valid, [2016-02-26 >> > 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to >> > invalid request: Request of length 1937006964 is not valid, it is larger >> > than the maximum size of 104857600 bytes. (kafka.network.Processor) >> > [2016-02-26 20:33:43,025] INFO Closing socket connection to / >> 10.224.146.62 >> > due to invalid request: Request of length 1937006964 is not valid, it is >> > larger than the maximum size of 104857600 bytes. >> (kafka.network.Processor) >> > [2016-02-26 20:33:43,047] INFO Closing socket connection to / >> 10.224.146.63 >> > due to invalid request: Request of length 1937006964 is not valid, it is >> > larger than the maximum size of 104857600 bytes. >> (kafka.network.Processor) >> > [2016-02-26 20:33:43,049] INFO Closing socket connection to / >> 10.224.146.61 >> > due to invalid request: Request of length 1937006964 is not valid, it is >> > larger than the maximum size of 104857600 bytes. >> (kafka.network.Processor) >> > [2016-02-26 20:33:43,055] INFO Closing socket connection to / >> 10.224.146.60 >> > due to invalid request: Request of length 1937006964 is not valid, it is >> > larger than the maximum size of 104857600 bytes. >> (kafka.network.Processor) >> > [2016-02-26 20:33:43,112] INFO Closing socket connection to / >> 10.224.146.59 >> > due to invalid request: Request of length 1937006964 is not valid, it is >> > larger than the maximum size of 104857600 bytes. >> > (kafka.network.Processor)of >> > 104857600 bytes. (kafka.network.Processor) >> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to >> > invalid request: Request of length 1937006964 is not valid, it is larger >> > than the maximum size of 104857600 bytes. (kafka.network.Processor) >> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to >> > invalid request: Request of length 1937006964 is not valid, it is larger >> > than the maximum size of 104857600 bytes. (kafka.network.Processor) >> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to >> > invalid request: Request of length 1937006964 is not valid, it is larger >> > than the maximum size of 104857600 bytes. (kafka.network.Processor) >> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to >> > invalid request: Request of length 1937006964 is not valid, it is larger >> > than the maximum size of 104857600 bytes. (kafka.network.Processor) >> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to >> > invalid request: Request of length 1937006964 is not valid, it is larger >> > than the maximum size of 104857600 bytes. (kafka.network.Processor) >> > >> > >