Hello,

I have seen some (perhaps) strange behaviour with the brokers when using
payloads which are:

1) pure byte array composed of random integers
2) byte array retrieved from a text file (5 MB size).

I tried to use the following JUNIT test code to send payload data (random
integers, the code is also in ProducerPerformance for kafka tools):

>
> public void testProduceRandomMessage_w_Integers() throws Exception {
> String topicName="testtopic11"; // This is a topic for 1 partition per
> broker test
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> ByteArraySerializer");
> props.put("value.serializer", "org.apache.kafka.common.serialization.
> ByteArraySerializer");
>
> KafkaProducer<byte[], byte[]> mockObject = new KafkaProducer<byte[],
> byte[]>(props);
>
>     Random random = new Random(0);
>     int recordSize = 4 * 1024 * 1024; // in MB
>     int numRecords = 100;
>     byte[] payload = new byte[recordSize];
>
>         for (int i = 0; i < payload.length; ++i) {
>             payload[i] = (byte) (random.nextInt(26) + 65);
>         }
>     File testFile = new File(getClass().getClassLoader().getResource("
> payload1.txt").getFile());
>     byte[] payload2 = Files.readAllBytes(testFile.toPath());
>     System.out.println("****Integer Payload size = "+payload.length);
>     System.out.println("****File payload size = "+payload2.length);
>
>     ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[],
> byte[]>(topicName, payload);
>
>     try {
>      for (int i=0; i < numRecords; i++) {
>      RecordMetadata rmd = mockObject.send(record).get();
>      System.out.println(rmd.checksum() + rmd.partition()+" | " +
> rmd.offset()+ " | " +rmd.topic() + " | ");
>      System.out.println(MessageFormat.format("*******### Message Size (in
> Bytes) = {0} ####*******", record.value().length));
>      }
>      mockObject.close();
>      } catch (Exception e) {
>      e.printStackTrace();
>     }
> }


and got the following error stack:


> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.RecordTooLargeException:
> The request included a message larger than the max message size the server
> will accept.
> at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.
> valueOrError(FutureRecordMetadata.java:70)
> at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
> FutureRecordMetadata.java:57)
> at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
> FutureRecordMetadata.java:25)
> at com.i.myproject.MyProducerTest.testProduceRandomMessage_w_
> Integers(MyProducerTest.java:138)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:17)
> at org.junit.internal.runners.statements.RunBefores.
> evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(
> RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:78)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(
> JUnit4TestReference.java:86)
> at org.eclipse.jdt.internal.junit.runner.TestExecution.
> run(TestExecution.java:38)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> runTests(RemoteTestRunner.java:459)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> runTests(RemoteTestRunner.java:678)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> run(RemoteTestRunner.java:382)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> main(RemoteTestRunner.java:192)


But when I used a file (which is very close to 5MB size on disk), I don't
get any error. The code for the file-based test is the following:

>
> public void testProducer_w_MultipleFiles() {
> File testFile = new File(getClass().getClassLoader().getResource("
> payload1.txt").getFile());
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> ByteArraySerializer");
> props.put("value.serializer", "org.apache.kafka.common.serialization.
> ByteArraySerializer");
> String topicName = "testtopic11";
> KafkaProducer<byte[], byte[]> mockObject2 = new KafkaProducer<byte[],
> byte[]>(props);
> byte[] fileBytes = {};
> try {
> fileBytes = Files.readAllBytes(testFile.toPath());
> } catch (IOException ioe) {
> ioe.printStackTrace();
> return;
> }
>                 int numRecords = 1;
> ProducerRecord<byte[], byte[]> fileInfo = new ProducerRecord<byte[],
> byte[]>(topicName, fileBytes);
> try {
> for (int i = 0; i < numRecords; i++) {
> RecordMetadata rmd = mockObject2.send(fileInfo).get();
> System.out
> .println(rmd.checksum() + rmd.partition() + " | " + rmd.offset() + " | " +
> rmd.topic() + " | ");
> }
> mockObject2.close();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }


Has anyone else seen this behaviour? Why does it treat file bytes different
than integer bytes when serialized?

KR,

Reply via email to