Zhaofu Liu created FLINK-32593:
----------------------------------
Summary: DelimitedInputFormat will cause record loss for
multi-bytes delimit when a delimit is seperated to two splits
Key: FLINK-32593
URL: https://issues.apache.org/jira/browse/FLINK-32593
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.17.1, 1.16.2, 1.16.1
Reporter: Zhaofu Liu
Attachments: 5parallel.dat, image-2023-07-15-10-30-03-740.png
Run the following test to reproduce this bug.
{code:java}
// code placeholder
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
public class MyTest {
@Test
public void myTest() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
String path =
MyTest.class.getClassLoader().getResource("5parallel.dat").getPath();
final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat();
// The delimiter is "B87E7E7E"
inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126,
(byte) 126});
// Set buffer size less than default value of 1M for easily debugging
inputFormat.setBufferSize(128);
DataStreamSource<byte[]> source = env.readFile(inputFormat, path);
source.map(new MapFunction<byte[], Object>() {
@Override
public Object map(byte[] value) throws Exception {
System.out.println(DatatypeConverter.printHexBinary(value));
return value;
}
}).setParallelism(1);
env.execute();
}
private class TestInputFormat extends DelimitedInputFormat<byte[]> {
@Override
public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int
numBytes) throws IOException {
final int delimiterLen = this.getDelimiter().length;
if (numBytes > 0) {
byte[] record = new byte[delimiterLen + numBytes];
System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen);
System.arraycopy(bytes, offset, record, delimiterLen, numBytes);
return record;
}
return new byte[0];
}
}
}
{code}
The actually output result is:
{code:java}
// code placeholder
B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
B87E7E7E1A00EB900A4EDC6D5516 {code}
The expected output result shoud be:
{code:java}
// code placeholder
B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0
B87E7E7E1A00EB900A4EDC6D5516
B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 {code}
The view of a delimit is seperated to two splits (The tail of line 2 and head
of line 3):
!image-2023-07-15-10-30-03-740.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)