Hi,

Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.

compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);


Regarding "lzop: unexpected end of file" problem, kindly add
"compressedStream.flush()" in the below method to flush any leftover data
before finishing.

public void finish() throws IOException {
  compressedStream.flush();
  compressedStream.finish();
}

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--

Regards,
Ravi

On Tue, Oct 22, 2019 at 4:10 AM amran dean <adfs54545...@gmail.com> wrote:

> Hello,
> I'm using BulkWriter to write newline-delimited, LZO-compressed files. The
> logic is very straightforward (See code below).
>
> I am experiencing an issue decompressing the created files created in this
> manner, consistently getting "lzop: unexpected end of file". Is this an
> issue with caller of BulkWriter?
>
> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
> in gibberish. I'm very confused what is going on.
>
> private final CompressionOutputStream compressedStream;
>
> public BulkRecordLZOSerializer(OutputStream stream) {
>     CompressionCodecFactory factory = new CompressionCodecFactory(new 
> Configuration());
>     try {
>         compressedStream = 
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>     } catch (IOException e) {
>         throw new IllegalStateException("Unable to create LZO OutputStream");
>     }
> }
>
> public void addElement(KafkaRecord record) throws IOException {
>     compressedStream.write(record.getValue());
>     compressedStream.write('\n');
> }
>
> public void finish() throws IOException {
>     compressedStream.finish();
> }
>
> public void flush() throws IOException {
>     compressedStream.flush();
> }
>
>

Reply via email to