Hi,

I'm trying to read parquet file with Flink 1.12.0 Scala API and save it as
another parquet file.

Now it's working correctly with ParquetRowInputFormat:

val inputPath: String = ...
val messageType: MessageType = ...

val parquetInputFormat = new ParquetRowInputFormat(new Path(inputPath),
messageType)
parquetInputFormat.setNestedFileEnumeration(true)

env.readFile(parquetInputFormat, inputPath)
    .map(row => {....//mapping row to MyPOJO})
    .sinkTo(FileSink.forBulkFormat...)



But when I replace the inputFormat:

val pojoTypeInfo =
Types.POJO(classOf[MyPOJO]).asInstanceOf[PojoTypeInfo[MyPOJO]]
val parquetInputFormat = new ParquetPojoInputFormat(new Path(inputPath),
messageType, pojoTypeInfo)
parquetInputFormat.setNestedFileEnumeration(true)

 env.createInput(parquetInputFormat)
    .sinkTo(FileSink.forBulkFormat...)



The job always fails with exception:

java.nio.channels.ClosedChannelException
        at
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:156)
        at
java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:331)
        at
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
        at
org.apache.flink.formats.parquet.PositionOutputStreamAdapter.getPos(PositionOutputStreamAdapter.java:50)
        at
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:349)
        at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:171)
        at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
        at
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
        at
org.apache.flink.formats.parquet.ParquetBulkWriter.finish(ParquetBulkWriter.java:62)
        at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:60)
        at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:276)
        at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:202)
        at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
        at
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)

This exception is always throwed after a warning:
warn [ContinuousFileReaderOperator] not processing any records while closed


I would supposed that problem is in my file sink, but the same file sink
works for ParquetRowInputFormat.
Did I miss something?

Thank you!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to