[
https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941797#comment-15941797
]
Luke Hutchison edited comment on FLINK-6185 at 3/25/17 8:22 PM:
----------------------------------------------------------------
Here's my simple gzip OutputFormat though, in case anyone else is looking for a
quick solution:
{code}
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.zip.GZIPOutputStream;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
public class GZippableTextOutputFormat<T> implements OutputFormat<T> {
private static final long serialVersionUID = 1L;
private File file;
private PrintWriter writer;
private boolean gzip;
public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean
gzip) {
if (writeMode != WriteMode.OVERWRITE) {
// Make this explicit, since we're about to overwrite the file
throw new IllegalArgumentException("writeMode must be
WriteMode.OVERWRITE");
}
this.file = file.getPath().endsWith(".gz") == gzip ? file : new
File(file.getPath() + ".gz");
this.gzip = gzip;
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (taskNumber < 0) {
throw new IllegalArgumentException("Invalid task number");
}
if (numTasks == 0 || numTasks > 1) {
throw new IllegalArgumentException(
"must call setParallelism(1) to use " +
ZippedJSONCollectionOutputFormat.class.getName());
}
try {
writer = gzip ? new PrintWriter(new GZIPOutputStream(new
FileOutputStream(file)))
: new PrintWriter(new FileOutputStream(file));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public File getFile() {
return file;
}
@Override
public void configure(Configuration parameters) {
}
@Override
public void writeRecord(T record) throws IOException {
writer.println(record.toString());
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(" + file + ")";
}
}
{code}
was (Author: lukehutch):
Here's my simple gzip OutputFormat though, in case anyone else is looking for a
quick solution:
{code}
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.zip.GZIPOutputStream;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
public class GZippableTextOutputFormat<T> implements OutputFormat<T> {
private static final long serialVersionUID = 1L;
private File file;
private PrintWriter writer;
private boolean gzip;
public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean
gzip) {
if (writeMode != WriteMode.OVERWRITE) {
// Make this explicit, since we're about to overwrite the file
throw new IllegalArgumentException("writeMode must be
WriteMode.OVERWRITE");
}
this.file = file.getPath().endsWith(".gz") == gzip ? file : new
File(file.getPath() + ".gz");
this.gzip = gzip;
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (taskNumber < 0) {
throw new IllegalArgumentException("Invalid task number");
}
if (numTasks == 0 || numTasks > 1) {
throw new IllegalArgumentException(
"must call setParallelism(1) to use " +
ZippedJSONCollectionOutputFormat.class.getName());
}
try {
writer = gzip ? new PrintWriter(new GZIPOutputStream(new
FileOutputStream(file)))
: new PrintWriter(new FileOutputStream(file));
} catch (Exception e) {
close();
throw new RuntimeException(e);
}
}
public File getFile() {
return file;
}
@Override
public void configure(Configuration parameters) {
}
@Override
public void writeRecord(T record) throws IOException {
writer.println(record.toString());
}
@Override
public void close() throws IOException {
writer.close();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(" + file + ")";
}
}
{code}
> Output writers and OutputFormats need to support compression
> ------------------------------------------------------------
>
> Key: FLINK-6185
> URL: https://issues.apache.org/jira/browse/FLINK-6185
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
> Priority: Minor
>
> File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such
> as {{FileOutputFormat}} and its subclasses, and methods such as
> {{DataSet#writeAsText()}}) need the ability to transparently decompress and
> compress files. Primarily gzip would be useful, but it would be nice if this
> were pluggable to support bzip2, xz, etc.
> There could be options for autodetect (based on file extension and/or file
> content), which could be the default, as well as no compression or a selected
> compression method.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)