This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d9a9b8  [FLINK-10114] Add ORC BulkWriter support for StreamingFileSink
5d9a9b8 is described below

commit 5d9a9b85edf4994f05518f5cbaf6cd3e2888ae43
Author: Sivaprasanna S <sivaprasanna...@gmail.com>
AuthorDate: Fri Mar 20 22:46:20 2020 +0530

    [FLINK-10114] Add ORC BulkWriter support for StreamingFileSink
    
    This closes #11474.
---
 docs/dev/connectors/streamfile_sink.md             | 202 ++++++++++-
 flink-formats/flink-orc/pom.xml                    |   8 +
 .../org/apache/flink/orc/vector/Vectorizer.java    |  96 ++++++
 .../org/apache/flink/orc/writer/OrcBulkWriter.java |  77 +++++
 .../flink/orc/writer/OrcBulkWriterFactory.java     | 126 +++++++
 .../flink/orc/writer/PhysicalWriterImpl.java       | 371 +++++++++++++++++++++
 .../java/org/apache/flink/orc/data/Record.java     |  59 ++++
 .../flink/orc/util/OrcBulkWriterTestUtil.java      |  98 ++++++
 .../apache/flink/orc/vector/RecordVectorizer.java  |  57 ++++
 .../flink/orc/writer/OrcBulkWriterITCase.java      |  77 +++++
 .../apache/flink/orc/writer/OrcBulkWriterTest.java |  83 +++++
 11 files changed, 1252 insertions(+), 2 deletions(-)

diff --git a/docs/dev/connectors/streamfile_sink.md 
b/docs/dev/connectors/streamfile_sink.md
index 9fe047e..271019a 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -136,12 +136,12 @@ specifying an `Encoder` we have to specify 
[BulkWriter.Factory]({{ site.javadocs
 The `BulkWriter` logic defines how new elements added, flushed and how the 
bulk of records
 are finalized for further encoding purposes.
 
-Flink comes with three built-in BulkWriter factories:
+Flink comes with four built-in BulkWriter factories:
 
  - [ParquetWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
  - [SequenceFileWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
  - [CompressWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
-
+ - [OrcBulkWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
 
 <div class="alert alert-info">
      <b>IMPORTANT:</b> Bulk Formats can only have `OnCheckpointRollingPolicy`, 
which rolls (ONLY) on every checkpoint.
@@ -204,6 +204,204 @@ input.addSink(sink)
 </div>
 </div>
 
+#### ORC Format
+ 
+To enable the data to be bulk encoded in ORC format, Flink offers 
[OrcBulkWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html) 
+which takes a concrete implementation of [Vectorizer]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/orc/vector/Vectorizer.html).
+
+Like any other columnar format that encodes data in bulk fashion, Flink's 
`OrcBulkWriter` writes the input elements in batches. It uses 
+ORC's `VectorizedRowBatch` to achieve this. 
+
+Since the input element has to be transformed to a `VectorizedRowBatch`, users 
have to extend the abstract `Vectorizer` 
+class and override the `vectorize(T element, VectorizedRowBatch batch)` 
method. As you can see, the method provides an 
+instance of `VectorizedRowBatch` to be used directly by the users so users 
just have to write the logic to transform the 
+input `element` to `ColumnVectors` and set them in the provided 
`VectorizedRowBatch` instance.
+
+For example, if the input element is of type `Person` which looks like: 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class Person {
+    private final String name;
+    private final int age;
+    ...
+}
+
+{% endhighlight %}
+</div>
+
+Then a child implementation to convert the element of type `Person` and set 
them in the `VectorizedRowBatch` can be like: 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+public class PersonVectorizer extends Vectorizer<Person> implements 
Serializable {     
+       public PersonVectorizer(String schema) {
+               super(schema);
+       }
+       @Override
+       public void vectorize(Person element, VectorizedRowBatch batch) throws 
IOException {
+               BytesColumnVector nameColVector = (BytesColumnVector) 
batch.cols[0];
+               LongColumnVector ageColVector = (LongColumnVector) 
batch.cols[1];
+               int row = batch.size++;
+               nameColVector.setVal(row, 
element.getName().getBytes(StandardCharsets.UTF_8));
+               ageColVector.vector[row] = element.getAge();
+       }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.nio.charset.StandardCharsets
+import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, 
LongColumnVector}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
+    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
+    nameColVector.setVal(batch.size + 1, 
element.getName.getBytes(StandardCharsets.UTF_8))
+    ageColVector.vector(batch.size + 1) = element.getAge
+  }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+To use the ORC bulk encoder in an application, users need to add the following 
dependency:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-orc{{ site.scala_version_suffix }}</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+And then a `StreamingFileSink` that writes data in ORC format can be created 
like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+
+String schema = "struct<_col0:string,_col1:int>";
+DataStream<Person> stream = ...;
+
+final OrcBulkWriterFactory<Person> writerFactory = new 
OrcBulkWriterFactory<>(new PersonVectorizer(schema));
+
+final StreamingFileSink<Person> sink = StreamingFileSink
+       .forBulkFormat(outputBasePath, writerFactory)
+       .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.orc.writer.OrcBulkWriterFactory
+
+val schema: String = "struct<_col0:string,_col1:int>"
+val input: DataStream[Person] = ...
+val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
+
+val sink: StreamingFileSink[Person] = StreamingFileSink
+    .forBulkFormat(outputBasePath, writerFactory)
+    .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so 
that a custom Hadoop configuration and ORC 
+writer properties can be provided.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+String schema = ...;
+Configuration conf = ...;
+Properties writerProperties = new Properties();
+
+writerProps.setProperty("orc.compress", "LZ4");
+// Other ORC supported properties can also be set similarly.
+
+final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
+    new PersonVectorizer(schema), writerProperties, conf);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val schema: String = ...
+val conf: Configuration = ...
+val writerProperties: Properties = new Properties()
+
+writerProps.setProperty("orc.compress", "LZ4")
+// Other ORC supported properties can also be set similarly.
+
+val writerFactory = new OrcBulkWriterFactory(
+    new PersonVectorizer(schema), writerProperties, conf)
+{% endhighlight %}
+</div>
+</div> 
+
+The complete list of ORC writer properties can be found 
[here](https://orc.apache.org/docs/hive-config.html).
+
+Users who want to add user metadata to the ORC files can do so by calling 
`addUserMetadata(...)` inside the overriding 
+`vectorize(...)` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class PersonVectorizer extends Vectorizer<Person> implements 
Serializable {     
+       @Override
+       public void vectorize(Person element, VectorizedRowBatch batch) throws 
IOException {
+               ...
+               String metadataKey = ...;
+               ByteBuffer metadataValue = ...;
+               this.addUserMetadata(metadataKey, metadataValue);
+       }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+    ...
+    val metadataKey: String = ...
+    val metadataValue: ByteBuffer = ...
+    addUserMetadata(metadataKey, metadataValue)
+  }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
 #### Hadoop SequenceFile format
 
 To use the SequenceFile bulk encoder in your application you need to add the 
following dependency:
diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 879c8d3..dd8e4a5 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -114,6 +114,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
        </dependencies>
 
        <build>
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/Vectorizer.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/Vectorizer.java
new file mode 100644
index 0000000..a273a7d
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/Vectorizer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.vector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.orc.writer.OrcBulkWriter;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class provides an abstracted set of methods to handle the lifecycle of 
{@link VectorizedRowBatch}.
+ *
+ * <p>Users have to extend this class and override the vectorize() method with 
the logic
+ * to transform the element to a {@link VectorizedRowBatch}.
+ *
+ * @param <T> The type of the element
+ */
+@PublicEvolving
+public abstract class Vectorizer<T> implements Serializable {
+
+       private final TypeDescription schema;
+
+       private transient Writer writer;
+
+       public Vectorizer(final String schema) {
+               checkNotNull(schema);
+               this.schema = TypeDescription.fromString(schema);
+       }
+
+       /**
+        * Provides the ORC schema.
+        *
+        * @return the ORC schema
+        */
+       public TypeDescription getSchema() {
+               return this.schema;
+       }
+
+       /**
+        * Users are not supposed to use this method since this is intended to 
be used only by the {@link OrcBulkWriter}.
+        *
+        * @param writer the underlying ORC Writer.
+        */
+       public void setWriter(Writer writer) {
+               this.writer = writer;
+       }
+
+       /**
+        * Adds arbitrary user metadata to the outgoing ORC file.
+        *
+        * <p>Users who want to dynamically add new metadata either based on 
either the input
+        * or from an external system can do so by calling 
<code>addUserMetadata(...)</code>
+        * inside the overridden vectorize() method.
+        *
+        * @param key a key to label the data with.
+        * @param value the contents of the metadata.
+        */
+       public void addUserMetadata(String key, ByteBuffer value) {
+               this.writer.addUserMetadata(key, value);
+       }
+
+       /**
+        * Transforms the provided element to ColumnVectors and
+        * sets them in the exposed VectorizedRowBatch.
+        *
+        * @param element The input element
+        * @param batch The batch to write the ColumnVectors
+        * @throws IOException if there is an error while transforming the 
input.
+        */
+       public abstract void vectorize(T element, VectorizedRowBatch batch) 
throws IOException;
+
+}
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriter.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriter.java
new file mode 100644
index 0000000..e10c25a
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.orc.vector.Vectorizer;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that writes data in ORC format.
+ *
+ * @param <T> The type of element written.
+ */
+@Internal
+public class OrcBulkWriter<T> implements BulkWriter<T> {
+
+       private final Writer writer;
+       private final Vectorizer<T> vectorizer;
+       private final VectorizedRowBatch rowBatch;
+
+       OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) {
+               this.vectorizer = checkNotNull(vectorizer);
+               this.writer = checkNotNull(writer);
+               this.rowBatch = vectorizer.getSchema().createRowBatch();
+
+               // Configure the vectorizer with the writer so that users can 
add
+               // metadata on the fly through the Vectorizer#vectorize(...) 
method.
+               this.vectorizer.setWriter(this.writer);
+       }
+
+       @Override
+       public void addElement(T element) throws IOException {
+               vectorizer.vectorize(element, rowBatch);
+               if (rowBatch.size == rowBatch.getMaxSize()) {
+                       writer.addRowBatch(rowBatch);
+                       rowBatch.reset();
+               }
+       }
+
+       @Override
+       public void flush() throws IOException {
+               if (rowBatch.size != 0) {
+                       writer.addRowBatch(rowBatch);
+                       rowBatch.reset();
+               }
+       }
+
+       @Override
+       public void finish() throws IOException {
+               flush();
+               writer.close();
+       }
+
+}
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
new file mode 100644
index 0000000..b34ef07
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.orc.vector.Vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.OrcFile;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates an ORC {@link BulkWriter}. The factory takes a user
+ * supplied {@link Vectorizer} implementation to convert the element into an
+ * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}.
+ *
+ * @param <T> The type of element to write.
+ */
+@PublicEvolving
+public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
+
+       /*
+       A dummy Hadoop Path to work around the current implementation of ORC 
WriterImpl which
+       works on the basis of a Hadoop FileSystem and Hadoop Path but since we 
use a customised
+       ORC PhysicalWriter implementation that uses Flink's own 
FSDataOutputStream as the
+       underlying/internal stream instead of Hadoop's FSDataOutputStream, we 
don't have to worry
+       about this usage.
+        */
+       private static final Path FIXED_PATH = new Path(".");
+
+       private final Vectorizer<T> vectorizer;
+       private final Properties writerProperties;
+       private final Map<String, String> confMap;
+
+       private OrcFile.WriterOptions writerOptions;
+
+       /**
+        * Creates a new OrcBulkWriterFactory using the provided Vectorizer
+        * implementation.
+        *
+        * @param vectorizer The vectorizer implementation to convert input
+        *                   record to a VectorizerRowBatch.
+        */
+       public OrcBulkWriterFactory(Vectorizer<T> vectorizer) {
+               this(vectorizer, new Configuration());
+       }
+
+       /**
+        * Creates a new OrcBulkWriterFactory using the provided Vectorizer, 
Hadoop
+        * Configuration.
+        *
+        * @param vectorizer The vectorizer implementation to convert input
+        *                   record to a VectorizerRowBatch.
+        */
+       public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration 
configuration) {
+               this(vectorizer, null, configuration);
+       }
+
+       /**
+        * Creates a new OrcBulkWriterFactory using the provided Vectorizer, 
Hadoop
+        * Configuration, ORC writer properties.
+        *
+        * @param vectorizer            The vectorizer implementation to 
convert input
+        *                          record to a VectorizerRowBatch.
+        * @param writerProperties  Properties that can be used in ORC 
WriterOptions.
+        */
+       public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties 
writerProperties, Configuration configuration) {
+               this.vectorizer = checkNotNull(vectorizer);
+               this.writerProperties = writerProperties;
+               this.confMap = new HashMap<>();
+
+               // Todo: Replace the Map based approach with a better approach
+               for (Map.Entry<String, String> entry : configuration) {
+                       confMap.put(entry.getKey(), entry.getValue());
+               }
+       }
+
+       @Override
+       public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+               OrcFile.WriterOptions opts = getWriterOptions();
+               opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+
+               return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, 
FIXED_PATH, opts));
+       }
+
+       private OrcFile.WriterOptions getWriterOptions() {
+               if (null == writerOptions) {
+                       Configuration conf = new Configuration();
+                       for (Map.Entry<String, String> entry : 
confMap.entrySet()) {
+                               conf.set(entry.getKey(), entry.getValue());
+                       }
+
+                       writerOptions = OrcFile.writerOptions(writerProperties, 
conf);
+                       writerOptions.setSchema(this.vectorizer.getSchema());
+               }
+
+               return writerOptions;
+       }
+}
+
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
new file mode 100644
index 0000000..cf9a06b
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.OrcCodecPool;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
+
+/**
+ * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
+ *
+ * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path,
+ * this implementation leverages Flink's {@link FSDataOutputStream} to write
+ * the compressed data.
+ *
+ * <p>NOTE: If the ORC dependency version is updated, this file may have to be
+ * updated as well to be in sync with the new version's PhysicalFsWriter.
+ */
+@Internal
+public class PhysicalWriterImpl implements PhysicalWriter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PhysicalWriterImpl.class);
+       private static final byte[] ZEROS = new byte[64 * 1024];
+       private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+       private final OutStream writer;
+       private final CodedOutputStream protobufWriter;
+       private final CompressionKind compress;
+       private final Map<StreamName, BufferedStream> streams;
+       private final HadoopShims shims;
+       private final int maxPadding;
+       private final int bufferSize;
+       private final long blockSize;
+       private final boolean addBlockPadding;
+       private final boolean writeVariableLengthBlocks;
+
+       private CompressionCodec codec;
+       private FSDataOutputStream out;
+       private long headerLength;
+       private long stripeStart;
+       private long blockOffset;
+       private int metadataLength;
+       private int footerLength;
+
+       PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) 
throws IOException {
+               if (opts.isEnforceBufferSize()) {
+                       this.bufferSize = opts.getBufferSize();
+               } else {
+                       this.bufferSize = getEstimatedBufferSize(
+                               opts.getStripeSize(), 
opts.getSchema().getMaximumId() + 1, opts.getBufferSize());
+               }
+
+               this.out = out;
+               this.blockOffset = 0;
+               this.blockSize = opts.getBlockSize();
+               this.maxPadding = (int) (opts.getPaddingTolerance() * (double) 
opts.getBufferSize());
+               this.compress = opts.getCompress();
+               this.codec = OrcCodecPool.getCodec(this.compress);
+               this.streams  = new TreeMap<>();
+               this.writer = new OutStream("metadata", this.bufferSize, 
this.codec, new DirectStream(this.out));
+               this.shims = opts.getHadoopShims();
+               this.addBlockPadding = opts.getBlockPadding();
+               this.protobufWriter = 
CodedOutputStream.newInstance(this.writer);
+               this.writeVariableLengthBlocks = 
opts.getWriteVariableLengthBlocks();
+       }
+
+       @Override
+       public void writeHeader() throws IOException {
+               this.out.write("ORC".getBytes());
+               this.headerLength = this.out.getPos();
+       }
+
+       @Override
+       public OutputReceiver createDataStream(StreamName name) throws 
IOException {
+               BufferedStream result = streams.get(name);
+
+               if (result == null) {
+                       result = new BufferedStream();
+                       streams.put(name, result);
+               }
+
+               return result;
+       }
+
+       @Override
+       public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index,
+                                                       CompressionCodec codec) 
throws IOException {
+               OutputStream stream = new OutStream(this.toString(), 
bufferSize, codec, createDataStream(name));
+               index.build().writeTo(stream);
+               stream.flush();
+       }
+
+       @Override
+       public void writeBloomFilter(StreamName name, 
OrcProto.BloomFilterIndex.Builder bloom,
+                                                               
CompressionCodec codec) throws IOException {
+               OutputStream stream = new OutStream(this.toString(), 
bufferSize, codec, createDataStream(name));
+               bloom.build().writeTo(stream);
+               stream.flush();
+       }
+
+       @Override
+       public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
+                                                               
OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+               long indexSize = 0;
+               long dataSize = 0;
+
+               for (Map.Entry<StreamName, BufferedStream> pair: 
streams.entrySet()) {
+                       BufferedStream receiver = pair.getValue();
+                       if (!receiver.isSuppressed) {
+                               long streamSize = receiver.getOutputSize();
+                               StreamName name = pair.getKey();
+                               
footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+                                       
.setKind(name.getKind()).setLength(streamSize));
+                               if (StreamName.Area.INDEX == name.getArea()) {
+                                       indexSize += streamSize;
+                               } else {
+                                       dataSize += streamSize;
+                               }
+                       }
+               }
+
+               dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+               OrcProto.StripeFooter footer = footerBuilder.build();
+               // Do we need to pad the file so the stripe doesn't straddle a 
block boundary?
+               padStripe(indexSize + dataSize + footer.getSerializedSize());
+
+               // write out the data streams
+               for (Map.Entry<StreamName, BufferedStream> pair : 
streams.entrySet()) {
+                       pair.getValue().spillToDiskAndClear(out);
+               }
+
+               // Write out the footer.
+               writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+       }
+
+       @Override
+       public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws 
IOException {
+               long startPosition = out.getPos();
+               OrcProto.Metadata metadata = builder.build();
+               metadata.writeTo(protobufWriter);
+               protobufWriter.flush();
+               writer.flush();
+               this.metadataLength = (int) (out.getPos() - startPosition);
+       }
+
+       @Override
+       public void writeFileFooter(OrcProto.Footer.Builder builder) throws 
IOException {
+               long bodyLength = out.getPos() - metadataLength;
+               builder.setContentLength(bodyLength);
+               builder.setHeaderLength(headerLength);
+               long startPosition = out.getPos();
+               OrcProto.Footer footer = builder.build();
+               footer.writeTo(protobufWriter);
+               protobufWriter.flush();
+               writer.flush();
+               this.footerLength = (int) (out.getPos() - startPosition);
+       }
+
+       @Override
+       public long writePostScript(OrcProto.PostScript.Builder builder) throws 
IOException {
+               builder.setFooterLength(footerLength);
+               builder.setMetadataLength(metadataLength);
+
+               OrcProto.PostScript ps = builder.build();
+               // need to write this uncompressed
+               long startPosition = out.getPos();
+               ps.writeTo(out);
+               long length = out.getPos() - startPosition;
+
+               if (length > 255) {
+                       throw new IllegalArgumentException("PostScript too 
large at " + length);
+               }
+
+               out.write((int) length);
+               return out.getPos();
+       }
+
+       @Override
+       public void close() {
+               // Just release the codec but don't close the internal stream 
here to avoid
+               // Stream Closed or ClosedChannelException when Flink performs 
checkpoint.
+               OrcCodecPool.returnCodec(compress, codec);
+               codec = null;
+       }
+
+       @Override
+       public void flush() throws IOException {
+               out.flush();
+       }
+
+       @Override
+       public void appendRawStripe(ByteBuffer buffer, 
OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+               long start = out.getPos();
+               int length = buffer.remaining();
+               long availBlockSpace = blockSize - (start % blockSize);
+
+               // see if stripe can fit in the current hdfs block, else pad 
the remaining
+               // space in the block
+               if (length < blockSize && length > availBlockSpace &&
+                       addBlockPadding) {
+                       byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, 
availBlockSpace)];
+                       LOG.info(String.format("Padding ORC by %d bytes while 
merging..",
+                               availBlockSpace));
+                       start += availBlockSpace;
+                       while (availBlockSpace > 0) {
+                               int writeLen = (int) Math.min(availBlockSpace, 
pad.length);
+                               out.write(pad, 0, writeLen);
+                               availBlockSpace -= writeLen;
+                       }
+               }
+
+               out.write(buffer.array(), buffer.arrayOffset() + 
buffer.position(), length);
+               dirEntry.setOffset(start);
+       }
+
+       @Override
+       public CompressionCodec getCompressionCodec() {
+               return this.codec;
+       }
+
+       @Override
+       public long getFileBytes(int column) {
+               long size = 0;
+
+               for (final Map.Entry<StreamName, BufferedStream> pair: 
streams.entrySet()) {
+                       final BufferedStream receiver = pair.getValue();
+                       if (!receiver.isSuppressed) {
+
+                               final StreamName name = pair.getKey();
+                               if (name.getColumn() == column && 
name.getArea() != StreamName.Area.INDEX) {
+                                       size += receiver.getOutputSize();
+                               }
+                       }
+
+               }
+
+               return size;
+       }
+
+       private void padStripe(long stripeSize) throws IOException {
+               this.stripeStart = out.getPos();
+               long previousBytesInBlock = (stripeStart - blockOffset) % 
blockSize;
+
+               // We only have options if this isn't the first stripe in the 
block
+               if (previousBytesInBlock > 0) {
+                       if (previousBytesInBlock + stripeSize >= blockSize) {
+                               // Try making a short block
+                               if (writeVariableLengthBlocks &&
+                                       shims.endVariableLengthBlock(out)) {
+                                       blockOffset = stripeStart;
+                               } else if (addBlockPadding) {
+                                       // if we cross the block boundary, 
figure out what we should do
+                                       long padding = blockSize - 
previousBytesInBlock;
+                                       if (padding <= maxPadding) {
+                                               writeZeros(out, padding);
+                                               stripeStart += padding;
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void writeStripeFooter(OrcProto.StripeFooter footer, long 
dataSize,
+                                                                       long 
indexSize, OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+               footer.writeTo(protobufWriter);
+               protobufWriter.flush();
+               writer.flush();
+
+               dirEntry.setOffset(stripeStart);
+               dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize 
- indexSize);
+       }
+
+       private static void writeZeros(OutputStream output, long remaining) 
throws IOException {
+               while (remaining > 0) {
+                       long size = Math.min(ZEROS.length, remaining);
+                       output.write(ZEROS, 0, (int) size);
+                       remaining -= size;
+               }
+       }
+
+       private static class DirectStream implements OutputReceiver {
+               private final FSDataOutputStream output;
+
+               DirectStream(FSDataOutputStream output) {
+                       this.output = output;
+               }
+
+               public void output(ByteBuffer buffer) throws IOException {
+                       this.output.write(buffer.array(), buffer.arrayOffset() 
+ buffer.position(), buffer.remaining());
+               }
+
+               public void suppress() {
+                       throw new UnsupportedOperationException("Can't suppress 
direct stream");
+               }
+       }
+
+       private static final class BufferedStream implements OutputReceiver {
+               private boolean isSuppressed = false;
+               private final List<ByteBuffer> output = new ArrayList<>();
+
+               @Override
+               public void output(ByteBuffer buffer) {
+                       if (!isSuppressed) {
+                               output.add(buffer);
+                       }
+               }
+
+               public void suppress() {
+                       isSuppressed = true;
+                       output.clear();
+               }
+
+               void spillToDiskAndClear(FSDataOutputStream raw) throws 
IOException {
+                       if (!isSuppressed) {
+                               for (ByteBuffer buffer: output) {
+                                       raw.write(buffer.array(), 
buffer.arrayOffset() + buffer.position(),
+                                               buffer.remaining());
+                               }
+                               output.clear();
+                       }
+                       isSuppressed = false;
+               }
+
+               public long getOutputSize() {
+                       long result = 0;
+                       for (ByteBuffer buffer: output) {
+                               result += buffer.remaining();
+                       }
+                       return result;
+               }
+       }
+
+}
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/data/Record.java 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/data/Record.java
new file mode 100644
index 0000000..f5561a09
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/data/Record.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.data;
+
+import java.io.Serializable;
+
+/**
+ * A sample type used for the integration test case.
+ */
+public class Record implements Serializable {
+       private final String name;
+       private final int age;
+
+       public Record(String name, int age) {
+               this.name = name;
+               this.age = age;
+       }
+
+       public String getName() {
+               return this.name;
+       }
+
+       public int getAge() {
+               return this.age;
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               if (this == other) {
+                       return true;
+               }
+
+               if (!(other instanceof Record)) {
+                       return false;
+               }
+
+               Record otherRecord = (Record) other;
+
+               return this.name.equals(otherRecord.getName())
+                       && this.age == otherRecord.getAge();
+       }
+}
+
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java
new file mode 100644
index 0000000..7503a85
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.util;
+
+import org.apache.flink.orc.data.Record;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Util class for the OrcBulkWriter tests.
+ */
+public class OrcBulkWriterTestUtil {
+
+       public static final String USER_METADATA_KEY = "userKey";
+       public static final ByteBuffer USER_METADATA_VALUE = 
ByteBuffer.wrap("hello".getBytes());
+
+       public static void validate(File files, List<Record> expected) throws 
IOException {
+               final File[] buckets = files.listFiles();
+               assertNotNull(buckets);
+               assertEquals(1, buckets.length);
+
+               final File[] partFiles = buckets[0].listFiles();
+               assertNotNull(partFiles);
+
+               for (File partFile : partFiles) {
+                       assertTrue(partFile.length() > 0);
+
+                       OrcFile.ReaderOptions readerOptions = 
OrcFile.readerOptions(new Configuration());
+                       Reader reader = OrcFile.createReader(new 
org.apache.hadoop.fs.Path(partFile.toURI()), readerOptions);
+
+                       assertEquals(3, reader.getNumberOfRows());
+                       assertEquals(2, 
reader.getSchema().getFieldNames().size());
+                       assertSame(reader.getCompressionKind(), 
CompressionKind.LZ4);
+                       assertTrue(reader.hasMetadataValue(USER_METADATA_KEY));
+                       
assertTrue(reader.getMetadataKeys().contains(USER_METADATA_KEY));
+
+                       List<Record> results = getResults(reader);
+
+                       assertEquals(3, results.size());
+                       assertEquals(results, expected);
+               }
+       }
+
+       private static List<Record> getResults(Reader reader) throws 
IOException {
+               List<Record> results = new ArrayList<>();
+
+               RecordReader recordReader = reader.rows();
+               VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+
+               while (recordReader.nextBatch(batch)) {
+                       BytesColumnVector stringVector = (BytesColumnVector)  
batch.cols[0];
+                       LongColumnVector intVector = (LongColumnVector) 
batch.cols[1];
+                       for (int r = 0; r < batch.size; r++) {
+                               String name = new 
String(stringVector.vector[r], stringVector.start[r], stringVector.length[r]);
+                               int age = (int) intVector.vector[r];
+
+                               results.add(new Record(name, age));
+                       }
+                       recordReader.close();
+               }
+
+               return results;
+       }
+}
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/vector/RecordVectorizer.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/vector/RecordVectorizer.java
new file mode 100644
index 0000000..5a0e83f
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/vector/RecordVectorizer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.vector;
+
+import org.apache.flink.orc.data.Record;
+import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A Vectorizer implementation used for tests.
+ *
+ * <p>It transforms an input element which is of type {@link Record}
+ * to a VectorizedRowBatch.
+ */
+public class RecordVectorizer extends Vectorizer<Record> implements 
Serializable {
+
+       public RecordVectorizer(String schema) {
+               super(schema);
+       }
+
+       @Override
+       public void vectorize(Record element, VectorizedRowBatch batch) throws 
IOException {
+               BytesColumnVector stringVector = (BytesColumnVector) 
batch.cols[0];
+               LongColumnVector intColVector = (LongColumnVector) 
batch.cols[1];
+
+               int row = batch.size++;
+
+               stringVector.setVal(row, 
element.getName().getBytes(StandardCharsets.UTF_8));
+               intColVector.vector[row] = element.getAge();
+
+               this.addUserMetadata(OrcBulkWriterTestUtil.USER_METADATA_KEY, 
OrcBulkWriterTestUtil.USER_METADATA_VALUE);
+       }
+
+}
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
new file mode 100644
index 0000000..5dce82e
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.data.Record;
+import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
+import org.apache.flink.orc.vector.RecordVectorizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Integration test for writing data in ORC bulk format using 
StreamingFileSink.
+ */
+public class OrcBulkWriterITCase extends TestLogger {
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private final String schema = "struct<_col0:string,_col1:int>";
+       private final List<Record> testData = Arrays.asList(
+               new Record("Sourav", 41), new Record("Saul", 35), new 
Record("Kim", 31));
+
+       @Test
+       public void testOrcBulkWriter() throws Exception {
+               final File outDir = TEMPORARY_FOLDER.newFolder();
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final Properties writerProps = new Properties();
+               writerProps.setProperty("orc.compress", "LZ4");
+
+               final OrcBulkWriterFactory<Record> factory = new 
OrcBulkWriterFactory<>(
+                       new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+               env.setParallelism(1);
+               env.enableCheckpointing(100);
+
+               DataStream<Record> stream = env.addSource(new 
FiniteTestSource<>(testData), TypeInformation.of(Record.class));
+               stream.map(str -> str)
+                       .addSink(StreamingFileSink
+                               .forBulkFormat(new Path(outDir.toURI()), 
factory)
+                               .build());
+
+               env.execute();
+
+               OrcBulkWriterTestUtil.validate(outDir, testData);
+       }
+}
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java
new file mode 100644
index 0000000..5a777a7
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.data.Record;
+import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
+import org.apache.flink.orc.vector.RecordVectorizer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Unit test for the ORC BulkWriter implementation.
+ */
+public class OrcBulkWriterTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private final String schema = "struct<_col0:string,_col1:int>";
+       private final List<Record> input = Arrays.asList(
+               new Record("Shiv", 44), new Record("Jesse", 23), new 
Record("Walt", 50));
+
+       @Test
+       public void testOrcBulkWriter() throws Exception {
+               final File outDir = TEMPORARY_FOLDER.newFolder();
+               final Properties writerProps = new Properties();
+               writerProps.setProperty("orc.compress", "LZ4");
+
+               final OrcBulkWriterFactory<Record> writer = new 
OrcBulkWriterFactory<>(
+                       new RecordVectorizer(schema), writerProps, new 
Configuration());
+
+               StreamingFileSink<Record> sink = StreamingFileSink
+                       .forBulkFormat(new Path(outDir.toURI()), writer)
+                       .withBucketCheckInterval(10000)
+                       .build();
+
+               try (OneInputStreamOperatorTestHarness<Record, Object> 
testHarness = new OneInputStreamOperatorTestHarness<>(
+                               new StreamSink<>(sink), 1, 1, 0)) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       int time = 0;
+                       for (final Record record : input) {
+                               testHarness.processElement(record, ++time);
+                       }
+
+                       testHarness.snapshot(1, ++time);
+                       testHarness.notifyOfCompletedCheckpoint(1);
+
+                       OrcBulkWriterTestUtil.validate(outDir, input);
+               }
+       }
+
+}

Reply via email to