I am trying to implement a class that will work similar to AvroFileFormat.
This tar archive has a very specific format. It has only one file inside
and that file is line delimited JSON.
I get this exception, but all the data is written to the temporary files. I
have checked that my code isn't closing the stream, which was my prior
issue.
Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)
public class TarInputFormat<E> extends FileInputFormat<E> implements
ResultTypeQueryable<E> {
private static final Logger logger =
LoggerFactory.getLogger(TarInputFormat.class);
private transient TarArchiveInputStream tarArchiveInputStream;
private TarArchiveEntry nextEntry;
private final Class<E> valueType;
private long currentPosition = 0L;
private static final ObjectMapper objectMapper = new ObjectMapper();
public TarInputFormat(Path filePath, Class<E> valueType) {
super(filePath);
this.valueType = valueType;
this.unsplittable = true;
this.setNumSplits(1);
}
@Override
public TypeInformation<E> getProducedType() {
return TypeExtractor.getForClass(this.valueType);
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
tarArchiveInputStream = new TarArchiveInputStream(stream);
nextEntry = tarArchiveInputStream.getNextTarEntry();
logger.info("Entry Name={} size={}",nextEntry.getName(),
nextEntry.getSize());
}
@Override
public void close() throws IOException {
super.close();
if (tarArchiveInputStream != null) {
tarArchiveInputStream.close();
}
}
@Override
public boolean reachedEnd() throws IOException {
return nextEntry == null || currentPosition == nextEntry.getSize();
}
@Override
public E nextRecord(E reuse) throws IOException {
if(reachedEnd()) {
return null;
}
logger.info("currentPosition={}", currentPosition);
int c;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while (currentPosition < nextEntry.getSize()) {
c = tarArchiveInputStream.read();
currentPosition++;
if (c == '\n') {
break;
} else {
bos.write(c);
}
}
return objectMapper.readValue(bos.toByteArray(), valueType);
}
}
Thanks.
--
Wayne D. Young
aka Billy Bob Bain
[email protected]