Zhaofu Liu created FLINK-32593: ---------------------------------- Summary: DelimitedInputFormat will cause record loss for multi-bytes delimit when a delimit is seperated to two splits Key: FLINK-32593 URL: https://issues.apache.org/jira/browse/FLINK-32593 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.17.1, 1.16.2, 1.16.1 Reporter: Zhaofu Liu Attachments: 5parallel.dat, image-2023-07-15-10-30-03-740.png
Run the following test to reproduce this bug. {code:java} // code placeholder import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.DelimitedInputFormat; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; import javax.xml.bind.DatatypeConverter; import java.io.IOException; public class MyTest { @Test public void myTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(5); String path = MyTest.class.getClassLoader().getResource("5parallel.dat").getPath(); final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat(); // The delimiter is "B87E7E7E" inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126, (byte) 126}); // Set buffer size less than default value of 1M for easily debugging inputFormat.setBufferSize(128); DataStreamSource<byte[]> source = env.readFile(inputFormat, path); source.map(new MapFunction<byte[], Object>() { @Override public Object map(byte[] value) throws Exception { System.out.println(DatatypeConverter.printHexBinary(value)); return value; } }).setParallelism(1); env.execute(); } private class TestInputFormat extends DelimitedInputFormat<byte[]> { @Override public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int numBytes) throws IOException { final int delimiterLen = this.getDelimiter().length; if (numBytes > 0) { byte[] record = new byte[delimiterLen + numBytes]; System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen); System.arraycopy(bytes, offset, record, delimiterLen, numBytes); return record; } return new byte[0]; } } } {code} The actually output result is: {code:java} // code placeholder B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99 B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 B87E7E7E1A00EB900A4EDC6D5516 {code} The expected output result shoud be: {code:java} // code placeholder B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99 B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0 B87E7E7E1A00EB900A4EDC6D5516 B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 {code} The view of a delimit is seperated to two splits (The tail of line 2 and head of line 3): !image-2023-07-15-10-30-03-740.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)