Zhaofu Liu created FLINK-32593:
----------------------------------

             Summary: DelimitedInputFormat will cause record loss for 
multi-bytes delimit when a delimit is seperated to two splits
                 Key: FLINK-32593
                 URL: https://issues.apache.org/jira/browse/FLINK-32593
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.17.1, 1.16.2, 1.16.1
            Reporter: Zhaofu Liu
         Attachments: 5parallel.dat, image-2023-07-15-10-30-03-740.png

Run the following test to reproduce this bug.
{code:java}
// code placeholder
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import javax.xml.bind.DatatypeConverter;
import java.io.IOException;

public class MyTest {

  @Test
  public void myTest() throws Exception {
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(5);

    String path = 
MyTest.class.getClassLoader().getResource("5parallel.dat").getPath();

    final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat();
    // The delimiter is "B87E7E7E"
    inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126, 
(byte) 126});
    // Set buffer size less than default value of 1M for easily debugging
    inputFormat.setBufferSize(128);

    DataStreamSource<byte[]> source = env.readFile(inputFormat, path);

    source.map(new MapFunction<byte[], Object>() {
      @Override
      public Object map(byte[] value) throws Exception {
        System.out.println(DatatypeConverter.printHexBinary(value));
        return value;
      }
    }).setParallelism(1);

    env.execute();
  }

  private class TestInputFormat extends DelimitedInputFormat<byte[]> {
    @Override
    public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
      final int delimiterLen = this.getDelimiter().length;

      if (numBytes > 0) {
        byte[] record = new byte[delimiterLen + numBytes];
        System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen);
        System.arraycopy(bytes, offset, record, delimiterLen, numBytes);
        return record;
      }

      return new byte[0];
    }
  }
}
 {code}
 

The actually output result is:
{code:java}
// code placeholder
B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
B87E7E7E1A00EB900A4EDC6D5516 {code}
 

The expected output result shoud be:
{code:java}
// code placeholder
B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0
B87E7E7E1A00EB900A4EDC6D5516
B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 {code}
The view of a delimit is seperated to two splits (The tail of line 2 and head 
of line 3):

!image-2023-07-15-10-30-03-740.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to