This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d2850ec Spark: Remove hardcoded file size from data source options
test (#2234)
d2850ec is described below
commit d2850ecf025cbc4001afc7f9a051040361faac4e
Author: Russell Spitzer <[email protected]>
AuthorDate: Fri Feb 12 22:37:24 2021 -0600
Spark: Remove hardcoded file size from data source options test (#2234)
---
.../iceberg/spark/source/TestDataSourceOptions.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index aea1687..d81982b 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -20,10 +20,12 @@
package org.apache.iceberg.spark.source;
import java.io.IOException;
+import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
@@ -35,6 +37,7 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
@@ -185,21 +188,29 @@ public abstract class TestDataSourceOptions {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SPLIT_SIZE, String.valueOf(128L * 1024 *
1024)); // 128Mb
- tables.create(SCHEMA, spec, options, tableLocation);
+ Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation);
List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b")
);
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords,
SimpleRecord.class);
- originalDf.select("id", "data").write()
+ originalDf.select("id", "data")
+ .repartition(1)
+ .write()
.format("iceberg")
.mode("append")
.save(tableLocation);
+ List<DataFile> files =
Lists.newArrayList(icebergTable.currentSnapshot().addedFiles());
+ Assert.assertEquals("Should have written 1 file", 1, files.size());
+
+ long fileSize = files.get(0).fileSizeInBytes();
+ long splitSize = LongMath.divide(fileSize, 2, RoundingMode.CEILING);
+
Dataset<Row> resultDf = spark.read()
.format("iceberg")
- .option(SparkReadOptions.SPLIT_SIZE, String.valueOf(611)) // 611 bytes
is the size of SimpleRecord(1,"a")
+ .option(SparkReadOptions.SPLIT_SIZE, String.valueOf(splitSize))
.load(tableLocation);
Assert.assertEquals("Spark partitions should match", 2,
resultDf.javaRDD().getNumPartitions());