I am trying to implement a class that will work similar to AvroFileFormat. This tar archive has a very specific format. It has only one file inside and that file is line delimited JSON.
I get this exception, but all the data is written to the temporary files. I have checked that my code isn't closing the stream, which was my prior issue. Caused by: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325) at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70) at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71) at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195) at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202) at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.base/java.lang.Thread.run(Thread.java:836) public class TarInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> { private static final Logger logger = LoggerFactory.getLogger(TarInputFormat.class); private transient TarArchiveInputStream tarArchiveInputStream; private TarArchiveEntry nextEntry; private final Class<E> valueType; private long currentPosition = 0L; private static final ObjectMapper objectMapper = new ObjectMapper(); public TarInputFormat(Path filePath, Class<E> valueType) { super(filePath); this.valueType = valueType; this.unsplittable = true; this.setNumSplits(1); } @Override public TypeInformation<E> getProducedType() { return TypeExtractor.getForClass(this.valueType); } @Override public void open(FileInputSplit split) throws IOException { super.open(split); tarArchiveInputStream = new TarArchiveInputStream(stream); nextEntry = tarArchiveInputStream.getNextTarEntry(); logger.info("Entry Name={} size={}",nextEntry.getName(), nextEntry.getSize()); } @Override public void close() throws IOException { super.close(); if (tarArchiveInputStream != null) { tarArchiveInputStream.close(); } } @Override public boolean reachedEnd() throws IOException { return nextEntry == null || currentPosition == nextEntry.getSize(); } @Override public E nextRecord(E reuse) throws IOException { if(reachedEnd()) { return null; } logger.info("currentPosition={}", currentPosition); int c; ByteArrayOutputStream bos = new ByteArrayOutputStream(); while (currentPosition < nextEntry.getSize()) { c = tarArchiveInputStream.read(); currentPosition++; if (c == '\n') { break; } else { bos.write(c); } } return objectMapper.readValue(bos.toByteArray(), valueType); } } Thanks. -- Wayne D. Young aka Billy Bob Bain billybobb...@gmail.com