Hello,
These changes result in the following error:
$ lzop -d part-1-0
lzop: part-1-0: not a lzop file


public class BulkRecordLZOSerializer implements BulkWriter<KafkaRecord> {

    private final CompressionOutputStream compressedStream;

    public BulkRecordLZOSerializer(OutputStream stream) {
        CompressionCodecFactory factory = new
CompressionCodecFactory(new Configuration());
        try {
            compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
        } catch (IOException e) {
            throw new IllegalStateException("Unable to create LZO
OutputStream");
        }
    }

    public void addElement(KafkaRecord record) throws IOException {
        compressedStream.write(record.getValue());
        compressedStream.write('\n');
    }

    public void finish() throws IOException {
        compressedStream.flush();
        compressedStream.finish();
    }

    public void flush() throws IOException {
        compressedStream.flush();
    }
}


On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi,
>
> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>
> compressedStream = 
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>
>
> Regarding "lzop: unexpected end of file" problem, kindly add
> "compressedStream.flush()" in the below method to flush any leftover data
> before finishing.
>
> public void finish() throws IOException {
>   compressedStream.flush();
>   compressedStream.finish();
> }
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>
> Regards,
> Ravi
>
> On Tue, Oct 22, 2019 at 4:10 AM amran dean <adfs54545...@gmail.com> wrote:
>
>> Hello,
>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>> The logic is very straightforward (See code below).
>>
>> I am experiencing an issue decompressing the created files created in
>> this manner, consistently getting "lzop: unexpected end of file". Is this
>> an issue with caller of BulkWriter?
>>
>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>> in gibberish. I'm very confused what is going on.
>>
>> private final CompressionOutputStream compressedStream;
>>
>> public BulkRecordLZOSerializer(OutputStream stream) {
>>     CompressionCodecFactory factory = new CompressionCodecFactory(new 
>> Configuration());
>>     try {
>>         compressedStream = 
>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>     } catch (IOException e) {
>>         throw new IllegalStateException("Unable to create LZO OutputStream");
>>     }
>> }
>>
>> public void addElement(KafkaRecord record) throws IOException {
>>     compressedStream.write(record.getValue());
>>     compressedStream.write('\n');
>> }
>>
>> public void finish() throws IOException {
>>     compressedStream.finish();
>> }
>>
>> public void flush() throws IOException {
>>     compressedStream.flush();
>> }
>>
>>

Reply via email to