Re: Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-07 Thread Yun Gao
Hi Billy,

I checked the provided example and found it should be a problem of 
ContinuousFileReader, and I created an issue for it[1]. For temporarily go 
around the issue, I think you may disable the chain of 
ContinuousFileReaderOperator with the following operators:

   
android.disableChaining().sinkTo(sink);
Best,
 Yun

[1] https://issues.apache.org/jira/browse/FLINK-20888




 --Original Mail --
Sender:Billy Bain 
Send Date:Thu Jan 7 04:02:34 2021
Recipients:Arvid Heise 
CC:user , Billy Bain 
Subject:Re: Implementing a TarInputFormat based on FileInputFormat

Hi Arvid, 

Thanks for the response. I have created a sample application with input data 
and uploaded it to google drive. The sample data is in the archive... thus the 
large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader 
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path 
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String inputPath = parameter.get("input_path");
String outputPath = parameter.get("output_path");
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource android = env.readFile(new 
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024)
.build())
.build();
android.sinkTo(sink);
env.execute("zMarket Android");
}
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise  wrote:

Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you 
maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:

I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and 
that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I 
have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at 
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at 
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at 
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at 
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat extends FileInputFormat implements 
ResultTypeQueryable {

private static final Logger logger = 
LoggerFactory.getLogger(TarInputFormat.class);
   

Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-06 Thread Billy Bain
Hi Arvid,

Thanks for the response. I have created a sample application with input
data and uploaded it to google drive. The sample data is in the archive...
thus the large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c
com.billybobbain.AndroidTarReader
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String inputPath = parameter.get("input_path");
String outputPath = parameter.get("output_path");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource android = env.readFile(new
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new
AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024)
.build())
.build();
android.sinkTo(sink);
env.execute("zMarket Android");
}
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise  wrote:

> Hi Billy,
>
> the exception is happening on the output side. Input side looks fine.
> Could you maybe post more information about the sink?
>
> On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:
>
>> I am trying to implement a class that will work similar to
>> AvroFileFormat.
>>
>> This tar archive has a very specific format. It has only one file inside
>> and that file is line delimited JSON.
>>
>> I get this exception, but all the data is written to the temporary files.
>> I have checked that my code isn't closing the stream, which was my prior
>> issue.
>>
>> Caused by: java.nio.channels.ClosedChannelException
>> at
>> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
>> at
>> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
>> at
>> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
>> at
>> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
>> Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.base/java.lang.Thread.run(Thread.java:836)
>>
>> public class TarInputFormat extends FileInputFormat implements
>> ResultTypeQueryable {
>>
>> private static final Logger logger =
>> LoggerFactory.getLogger(TarInputFormat.class);
>> private transient TarArchiveInputStream tarArchiveInputStream;
>> private TarArchiveEntry nextEntry;
>> private final Class valueType;
>> private long currentPosition = 0L;
>> private static final ObjectMapper objectMapper = new ObjectMapper();
>>
>> public TarInputFormat(Path filePath, Class valueType) {
>> super(filePath);
>> this.valueType = valueType;
>> this.unsplittable = true;
>> this.setNumSplits(1);
>> }
>>
>> 

Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-05 Thread Arvid Heise
Hi Billy,

the exception is happening on the output side. Input side looks fine. Could
you maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:

> I am trying to implement a class that will work similar to AvroFileFormat.
>
> This tar archive has a very specific format. It has only one file inside
> and that file is line delimited JSON.
>
> I get this exception, but all the data is written to the temporary files.
> I have checked that my code isn't closing the stream, which was my prior
> issue.
>
> Caused by: java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
> at
> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
> at
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
> at
> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.base/java.lang.Thread.run(Thread.java:836)
>
> public class TarInputFormat extends FileInputFormat implements
> ResultTypeQueryable {
>
> private static final Logger logger =
> LoggerFactory.getLogger(TarInputFormat.class);
> private transient TarArchiveInputStream tarArchiveInputStream;
> private TarArchiveEntry nextEntry;
> private final Class valueType;
> private long currentPosition = 0L;
> private static final ObjectMapper objectMapper = new ObjectMapper();
>
> public TarInputFormat(Path filePath, Class valueType) {
> super(filePath);
> this.valueType = valueType;
> this.unsplittable = true;
> this.setNumSplits(1);
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(this.valueType);
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> tarArchiveInputStream = new TarArchiveInputStream(stream);
> nextEntry = tarArchiveInputStream.getNextTarEntry();
> logger.info("Entry Name={} size={}",nextEntry.getName(),
> nextEntry.getSize());
> }
>
> @Override
> public void close() throws IOException {
> super.close();
> if (tarArchiveInputStream != null) {
> tarArchiveInputStream.close();
> }
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return nextEntry == null ||  currentPosition ==
> nextEntry.getSize();
> }
>
> @Override
> public E nextRecord(E reuse) throws IOException {
> if(reachedEnd()) {
> return null;
> }
> logger.info("currentPosition={}", currentPosition);
> int c;
> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> while (currentPosition < nextEntry.getSize()) {
> c = tarArchiveInputStream.read();
> currentPosition++;
> if (c == '\n') {
> break;
> } else {
> bos.write(c);
> }
> }
> return objectMapper.readValue(bos.toByteArray(), valueType);
> }
>
> }
>
> Thanks.
>
> --
> Wayne D. Young
> aka Billy Bob Bain
> billybobb...@gmail.com
>


-- 

Arvid Heise | Senior Java Developer



Follow us