This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2dfc455 Spark: support ORC writes (#857)
2dfc455 is described below
commit 2dfc4551182f5b210755792b6624a3f5a20263e3
Author: dingxiaokun <[email protected]>
AuthorDate: Sat Apr 4 08:02:59 2020 +0800
Spark: support ORC writes (#857)
Fixes #855.
---
.../org/apache/iceberg/spark/source/Writer.java | 14 +++-
...stParquetWrite.java => TestSparkDataWrite.java} | 88 ++++++++++++++++------
2 files changed, 77 insertions(+), 25 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 844c681..88f8a24 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -52,8 +52,10 @@ import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -309,6 +311,14 @@ class Writer implements DataSourceWriter {
.overwrite()
.build();
+ case ORC:
+ return ORC.write(file)
+ .createWriterFunc(SparkOrcWriter::new)
+ .setAll(properties)
+ .schema(dsSchema)
+ .overwrite()
+ .build();
+
default:
throw new UnsupportedOperationException("Cannot write unknown
format: " + fileFormat);
}
@@ -389,7 +399,9 @@ class Writer implements DataSourceWriter {
public abstract void write(InternalRow row) throws IOException;
public void writeInternal(InternalRow row) throws IOException {
- if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >=
targetFileSize) {
+ //TODO: ORC file now not support target file size before closed
+ if (!format.equals(FileFormat.ORC) &&
+ currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >=
targetFileSize) {
closeCurrent();
openCurrent();
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
similarity index 81%
rename from
spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
rename to
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 7fa335a..f6bcb66 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -23,8 +23,10 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
@@ -43,11 +45,16 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
-public class TestParquetWrite {
+@RunWith(Parameterized.class)
+public class TestSparkDataWrite {
private static final Configuration CONF = new Configuration();
+ private final FileFormat format;
+ private static SparkSession spark = null;
private static final Schema SCHEMA = new Schema(
optional(1, "id", Types.IntegerType.get()),
optional(2, "data", Types.StringType.get())
@@ -56,23 +63,34 @@ public class TestParquetWrite {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
- private static SparkSession spark = null;
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] { "parquet" },
+ new Object[] { "avro" },
+ new Object[] { "orc" }
+ };
+ }
@BeforeClass
public static void startSpark() {
- TestParquetWrite.spark =
SparkSession.builder().master("local[2]").getOrCreate();
+ TestSparkDataWrite.spark =
SparkSession.builder().master("local[2]").getOrCreate();
}
@AfterClass
public static void stopSpark() {
- SparkSession currentSpark = TestParquetWrite.spark;
- TestParquetWrite.spark = null;
+ SparkSession currentSpark = TestSparkDataWrite.spark;
+ TestSparkDataWrite.spark = null;
currentSpark.stop();
}
+ public TestSparkDataWrite(String format) {
+ this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+ }
+
@Test
public void testBasicWrite() throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -86,10 +104,10 @@ public class TestParquetWrite {
);
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
-
// TODO: incoming columns must be ordered according to the table's schema
df.select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
@@ -104,20 +122,26 @@ public class TestParquetWrite {
Assert.assertEquals("Result rows should match", expected, actual);
for (ManifestFile manifest : table.currentSnapshot().manifests()) {
for (DataFile file : ManifestReader.read(manifest, table.io())) {
- Assert.assertNotNull("Split offsets not present", file.splitOffsets());
+ // TODO: avro not support split
+ if (!format.equals(FileFormat.AVRO)) {
+ Assert.assertNotNull("Split offsets not present",
file.splitOffsets());
+ }
Assert.assertEquals("Should have reported record count as 1", 1,
file.recordCount());
- Assert.assertNotNull("Column sizes metric not present",
file.columnSizes());
- Assert.assertNotNull("Counts metric not present", file.valueCounts());
- Assert.assertNotNull("Null value counts metric not present",
file.nullValueCounts());
- Assert.assertNotNull("Lower bounds metric not present",
file.lowerBounds());
- Assert.assertNotNull("Upper bounds metric not present",
file.upperBounds());
+ //TODO: append more metric info
+ if (format.equals(FileFormat.PARQUET)) {
+ Assert.assertNotNull("Column sizes metric not present",
file.columnSizes());
+ Assert.assertNotNull("Counts metric not present",
file.valueCounts());
+ Assert.assertNotNull("Null value counts metric not present",
file.nullValueCounts());
+ Assert.assertNotNull("Lower bounds metric not present",
file.lowerBounds());
+ Assert.assertNotNull("Upper bounds metric not present",
file.upperBounds());
+ }
}
}
}
@Test
public void testAppend() throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -143,11 +167,13 @@ public class TestParquetWrite {
df.select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
df.withColumn("id", df.col("id").plus(3)).select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
@@ -164,7 +190,7 @@ public class TestParquetWrite {
@Test
public void testOverwrite() throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -189,12 +215,14 @@ public class TestParquetWrite {
df.select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
// overwrite with 2*id to replace record 2, append 4 and 6
df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("overwrite")
.save(location.toString());
@@ -211,7 +239,7 @@ public class TestParquetWrite {
@Test
public void testUnpartitionedOverwrite() throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -228,12 +256,14 @@ public class TestParquetWrite {
df.select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
// overwrite with the same data; should not produce two copies
df.select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("overwrite")
.save(location.toString());
@@ -250,7 +280,7 @@ public class TestParquetWrite {
@Test
public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties()
throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -270,6 +300,7 @@ public class TestParquetWrite {
df.select("id", "data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
@@ -289,13 +320,16 @@ public class TestParquetWrite {
files.add(file);
}
}
- Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
- Assert.assertTrue("All DataFiles contain 1000 rows",
files.stream().allMatch(d -> d.recordCount() == 1000));
+ // TODO: ORC file now not support target file size
+ if (!format.equals(FileFormat.ORC)) {
+ Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
+ Assert.assertTrue("All DataFiles contain 1000 rows",
files.stream().allMatch(d -> d.recordCount() == 1000));
+ }
}
@Test
public void testPartitionedCreateWithTargetFileSizeViaOption() throws
IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -314,6 +348,7 @@ public class TestParquetWrite {
df.select("id", "data").sort("data").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
.save(location.toString());
@@ -334,13 +369,16 @@ public class TestParquetWrite {
files.add(file);
}
}
- Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
- Assert.assertTrue("All DataFiles contain 1000 rows",
files.stream().allMatch(d -> d.recordCount() == 1000));
+ // TODO: ORC file now not support target file size
+ if (!format.equals(FileFormat.ORC)) {
+ Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
+ Assert.assertTrue("All DataFiles contain 1000 rows",
files.stream().allMatch(d -> d.recordCount() == 1000));
+ }
}
@Test
public void testWriteProjection() throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -357,6 +395,7 @@ public class TestParquetWrite {
df.select("id").write() // select only id column
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());
@@ -373,7 +412,7 @@ public class TestParquetWrite {
@Test
public void testWriteProjectionWithMiddle() throws IOException {
- File parent = temp.newFolder("parquet");
+ File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
@@ -395,6 +434,7 @@ public class TestParquetWrite {
df.select("c1", "c3").write()
.format("iceberg")
+ .option("write-format", format.toString())
.mode("append")
.save(location.toString());