Hello, I have seen some (perhaps) strange behaviour with the brokers when using payloads which are:
1) pure byte array composed of random integers 2) byte array retrieved from a text file (5 MB size). I tried to use the following JUNIT test code to send payload data (random integers, the code is also in ProducerPerformance for kafka tools): > > public void testProduceRandomMessage_w_Integers() throws Exception { > String topicName="testtopic11"; // This is a topic for 1 partition per > broker test > props.put("key.serializer", "org.apache.kafka.common.serialization. > ByteArraySerializer"); > props.put("value.serializer", "org.apache.kafka.common.serialization. > ByteArraySerializer"); > > KafkaProducer<byte[], byte[]> mockObject = new KafkaProducer<byte[], > byte[]>(props); > > Random random = new Random(0); > int recordSize = 4 * 1024 * 1024; // in MB > int numRecords = 100; > byte[] payload = new byte[recordSize]; > > for (int i = 0; i < payload.length; ++i) { > payload[i] = (byte) (random.nextInt(26) + 65); > } > File testFile = new File(getClass().getClassLoader().getResource(" > payload1.txt").getFile()); > byte[] payload2 = Files.readAllBytes(testFile.toPath()); > System.out.println("****Integer Payload size = "+payload.length); > System.out.println("****File payload size = "+payload2.length); > > ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], > byte[]>(topicName, payload); > > try { > for (int i=0; i < numRecords; i++) { > RecordMetadata rmd = mockObject.send(record).get(); > System.out.println(rmd.checksum() + rmd.partition()+" | " + > rmd.offset()+ " | " +rmd.topic() + " | "); > System.out.println(MessageFormat.format("*******### Message Size (in > Bytes) = {0} ####*******", record.value().length)); > } > mockObject.close(); > } catch (Exception e) { > e.printStackTrace(); > } > } and got the following error stack: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.RecordTooLargeException: > The request included a message larger than the max message size the server > will accept. > at org.apache.kafka.clients.producer.internals.FutureRecordMetadata. > valueOrError(FutureRecordMetadata.java:70) > at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get( > FutureRecordMetadata.java:57) > at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get( > FutureRecordMetadata.java:25) > at com.i.myproject.MyProducerTest.testProduceRandomMessage_w_ > Integers(MyProducerTest.java:138) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( > FrameworkMethod.java:50) > at org.junit.internal.runners.model.ReflectiveCallable.run( > ReflectiveCallable.java:12) > at org.junit.runners.model.FrameworkMethod.invokeExplosively( > FrameworkMethod.java:47) > at org.junit.internal.runners.statements.InvokeMethod. > evaluate(InvokeMethod.java:17) > at org.junit.internal.runners.statements.RunBefores. > evaluate(RunBefores.java:26) > at org.junit.internal.runners.statements.RunAfters.evaluate( > RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at org.junit.runners.BlockJUnit4ClassRunner.runChild( > BlockJUnit4ClassRunner.java:78) > at org.junit.runners.BlockJUnit4ClassRunner.runChild( > BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run( > JUnit4TestReference.java:86) > at org.eclipse.jdt.internal.junit.runner.TestExecution. > run(TestExecution.java:38) > at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. > runTests(RemoteTestRunner.java:459) > at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. > runTests(RemoteTestRunner.java:678) > at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. > run(RemoteTestRunner.java:382) > at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. > main(RemoteTestRunner.java:192) But when I used a file (which is very close to 5MB size on disk), I don't get any error. The code for the file-based test is the following: > > public void testProducer_w_MultipleFiles() { > File testFile = new File(getClass().getClassLoader().getResource(" > payload1.txt").getFile()); > props.put("key.serializer", "org.apache.kafka.common.serialization. > ByteArraySerializer"); > props.put("value.serializer", "org.apache.kafka.common.serialization. > ByteArraySerializer"); > String topicName = "testtopic11"; > KafkaProducer<byte[], byte[]> mockObject2 = new KafkaProducer<byte[], > byte[]>(props); > byte[] fileBytes = {}; > try { > fileBytes = Files.readAllBytes(testFile.toPath()); > } catch (IOException ioe) { > ioe.printStackTrace(); > return; > } > int numRecords = 1; > ProducerRecord<byte[], byte[]> fileInfo = new ProducerRecord<byte[], > byte[]>(topicName, fileBytes); > try { > for (int i = 0; i < numRecords; i++) { > RecordMetadata rmd = mockObject2.send(fileInfo).get(); > System.out > .println(rmd.checksum() + rmd.partition() + " | " + rmd.offset() + " | " + > rmd.topic() + " | "); > } > mockObject2.close(); > } catch (Exception e) { > e.printStackTrace(); > } > } Has anyone else seen this behaviour? Why does it treat file bytes different than integer bytes when serialized? KR,