This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 700fc9e Core: Add specId to DataFile (#1317)
700fc9e is described below
commit 700fc9eaf91317e08287bbc23c5088494e74184a
Author: Ryan Blue <[email protected]>
AuthorDate: Tue Aug 11 16:02:14 2020 -0700
Core: Add specId to DataFile (#1317)
---
.../main/java/org/apache/iceberg/ContentFile.java | 5 ++
.../test/java/org/apache/iceberg/TestHelpers.java | 5 ++
.../java/org/apache/iceberg/AllManifestsTable.java | 7 +-
.../src/main/java/org/apache/iceberg/BaseFile.java | 14 +++-
.../main/java/org/apache/iceberg/DataFiles.java | 85 +++-------------------
.../main/java/org/apache/iceberg/FileMetadata.java | 15 +---
.../java/org/apache/iceberg/GenericDataFile.java | 20 +----
.../java/org/apache/iceberg/GenericDeleteFile.java | 4 +-
.../apache/iceberg/InheritableMetadataFactory.java | 10 ++-
.../java/org/apache/iceberg/StaticDataTask.java | 2 +-
.../main/java/org/apache/iceberg/V1Metadata.java | 5 ++
.../main/java/org/apache/iceberg/V2Metadata.java | 5 ++
.../apache/iceberg/TestManifestWriterVersions.java | 4 +-
.../java/org/apache/iceberg/TestMergeAppend.java | 26 ++++---
.../org/apache/iceberg/TestRewriteManifests.java | 32 +++++---
.../org/apache/iceberg/data/TestLocalScan.java | 7 +-
.../org/apache/iceberg/spark/SparkDataFile.java | 5 ++
.../iceberg/spark/source/TestPartitionValues.java | 10 ++-
18 files changed, 123 insertions(+), 138 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java
b/api/src/main/java/org/apache/iceberg/ContentFile.java
index 262ea17..11d4a4d 100644
--- a/api/src/main/java/org/apache/iceberg/ContentFile.java
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -30,6 +30,11 @@ import java.util.Map;
*/
public interface ContentFile<F> {
/**
+ * @return id of the partition spec used for partition metadata
+ */
+ int specId();
+
+ /**
* @return type of content stored in the file; one of DATA,
POSITION_DELETES, or EQUALITY_DELETES
*/
FileContent content();
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index b4881b3..c1fce01 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -312,6 +312,11 @@ public class TestHelpers {
}
@Override
+ public int specId() {
+ return 0;
+ }
+
+ @Override
public CharSequence path() {
return path;
}
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 8194338..367826f 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -128,8 +128,13 @@ public class AllManifestsTable extends BaseMetadataTable {
if (snap.manifestListLocation() != null) {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() :
rowFilter;
ResidualEvaluator residuals =
ResidualEvaluator.unpartitioned(filter);
+ DataFile manifestListAsDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
+
.withInputFile(ops.io().newInputFile(snap.manifestListLocation()))
+ .withRecordCount(1)
+ .withFormat(FileFormat.AVRO)
+ .build();
return new ManifestListReadTask(ops.io(), table().spec(), new
BaseFileScanTask(
-
DataFiles.fromManifestList(ops.io().newInputFile(snap.manifestListLocation())),
null,
+ manifestListAsDataFile, null,
schemaString, specString, residuals));
} else {
return StaticDataTask.of(
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java
b/core/src/main/java/org/apache/iceberg/BaseFile.java
index a408c7a..db8320e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -53,6 +53,7 @@ abstract class BaseFile<F>
private int[] fromProjectionPos;
private Types.StructType partitionType;
+ private int partitionSpecId = -1;
private FileContent content = FileContent.DATA;
private String filePath = null;
private FileFormat format = null;
@@ -108,11 +109,12 @@ abstract class BaseFile<F>
this.partitionData = new PartitionData(partitionType);
}
- BaseFile(FileContent content, String filePath, FileFormat format,
+ BaseFile(int specId, FileContent content, String filePath, FileFormat format,
PartitionData partition, long fileSizeInBytes, long recordCount,
Map<Integer, Long> columnSizes, Map<Integer, Long> valueCounts,
Map<Integer, Long> nullValueCounts,
Map<Integer, ByteBuffer> lowerBounds, Map<Integer, ByteBuffer>
upperBounds, List<Long> splitOffsets,
ByteBuffer keyMetadata) {
+ this.partitionSpecId = specId;
this.content = content;
this.filePath = filePath;
this.format = format;
@@ -145,6 +147,7 @@ abstract class BaseFile<F>
* @param fullCopy whether to copy all fields or to drop column-level stats
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
+ this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
this.filePath = toCopy.filePath;
this.format = toCopy.format;
@@ -178,6 +181,15 @@ abstract class BaseFile<F>
BaseFile() {
}
+ @Override
+ public int specId() {
+ return partitionSpecId;
+ }
+
+ void setSpecId(int specId) {
+ this.partitionSpecId = specId;
+ }
+
protected abstract Schema getAvroSchema(Types.StructType partitionStruct);
@Override
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index feb2303..b9fae62 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -94,88 +94,27 @@ public class DataFiles {
return copyPartitionData(spec, partition, null);
}
- public static DataFile fromStat(FileStatus stat, long rowCount) {
- String location = stat.getPath().toString();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(location, format, rowCount, stat.getLen());
- }
-
- public static DataFile fromStat(FileStatus stat, PartitionData partition,
long rowCount) {
- String location = stat.getPath().toString();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(
- location, format, partition, rowCount, stat.getLen());
- }
-
- public static DataFile fromStat(FileStatus stat, PartitionData partition,
Metrics metrics,
- EncryptionKeyMetadata keyMetadata, List<Long> splitOffsets) {
- String location = stat.getPath().toString();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(
- location, format, partition, stat.getLen(), metrics,
keyMetadata.buffer(), splitOffsets);
- }
-
- public static DataFile fromInputFile(InputFile file, PartitionData
partition, long rowCount) {
- if (file instanceof HadoopInputFile) {
- return fromStat(((HadoopInputFile) file).getStat(), partition, rowCount);
- }
-
- String location = file.location();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(
- location, format, partition, rowCount, file.getLength());
- }
-
- public static DataFile fromInputFile(InputFile file, long rowCount) {
- if (file instanceof HadoopInputFile) {
- return fromStat(((HadoopInputFile) file).getStat(), rowCount);
- }
-
- String location = file.location();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(location, format, rowCount, file.getLength());
- }
-
- public static DataFile fromEncryptedOutputFile(EncryptedOutputFile
encryptedFile, PartitionData partition,
- Metrics metrics, List<Long>
splitOffsets) {
- EncryptionKeyMetadata keyMetadata = encryptedFile.keyMetadata();
- InputFile file = encryptedFile.encryptingOutputFile().toInputFile();
- if (encryptedFile instanceof HadoopInputFile) {
- return fromStat(((HadoopInputFile) file).getStat(), partition, metrics,
keyMetadata, splitOffsets);
- }
-
- String location = file.location();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(
- location, format, partition, file.getLength(), metrics,
keyMetadata.buffer(), splitOffsets);
- }
-
public static DataFile fromManifest(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.addedFilesCount() != null && manifest.existingFilesCount() !=
null,
"Cannot create data file from manifest: data file counts are
missing.");
- return new GenericDataFile(manifest.path(),
- FileFormat.AVRO,
- manifest.addedFilesCount() + manifest.existingFilesCount(),
- manifest.length());
- }
-
- public static DataFile fromManifestList(InputFile manifestList) {
- return new GenericDataFile(manifestList.location(), FileFormat.AVRO, 1,
manifestList.getLength());
+ return DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(manifest.path())
+ .withFormat(FileFormat.AVRO)
+ .withRecordCount(manifest.addedFilesCount() +
manifest.existingFilesCount())
+ .withFileSizeInBytes(manifest.length())
+ .build();
}
public static Builder builder(PartitionSpec spec) {
return new Builder(spec);
}
- static Builder builder() {
- return new Builder();
- }
-
public static class Builder {
private final PartitionSpec spec;
private final boolean isPartitioned;
+ private final int specId;
private PartitionData partitionData;
private String filePath = null;
private FileFormat format = null;
@@ -191,14 +130,9 @@ public class DataFiles {
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
- public Builder() {
- this.spec = null;
- this.partitionData = null;
- this.isPartitioned = false;
- }
-
public Builder(PartitionSpec spec) {
this.spec = spec;
+ this.specId = spec.specId();
this.isPartitioned = spec.fields().size() > 0;
this.partitionData = isPartitioned ? newPartitionData(spec) : null;
}
@@ -221,6 +155,7 @@ public class DataFiles {
public Builder copy(DataFile toCopy) {
if (isPartitioned) {
+ Preconditions.checkState(specId == toCopy.specId(), "Cannot copy a
DataFile with a different spec");
this.partitionData = copyPartitionData(spec, toCopy.partition(),
partitionData);
}
this.filePath = toCopy.path().toString();
@@ -338,7 +273,7 @@ public class DataFiles {
Preconditions.checkArgument(recordCount >= 0, "Record count is
required");
return new GenericDataFile(
- filePath, format, isPartitioned ? partitionData.copy() : null,
+ specId, filePath, format, isPartitioned ? partitionData.copy() :
null,
fileSizeInBytes, new Metrics(
recordCount, columnSizes, valueCounts, nullValueCounts,
lowerBounds, upperBounds),
keyMetadata, splitOffsets);
diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java
b/core/src/main/java/org/apache/iceberg/FileMetadata.java
index 854cf57..7ff31e0 100644
--- a/core/src/main/java/org/apache/iceberg/FileMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java
@@ -38,13 +38,10 @@ class FileMetadata {
return new Builder(spec);
}
- static Builder deleteFileBuilder() {
- return new Builder();
- }
-
public static class Builder {
private final PartitionSpec spec;
private final boolean isPartitioned;
+ private final int specId;
private FileContent content = null;
private PartitionData partitionData;
private String filePath = null;
@@ -60,14 +57,9 @@ class FileMetadata {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
- Builder() {
- this.spec = null;
- this.partitionData = null;
- this.isPartitioned = false;
- }
-
Builder(PartitionSpec spec) {
this.spec = spec;
+ this.specId = spec.specId();
this.isPartitioned = spec.fields().size() > 0;
this.partitionData = isPartitioned ? DataFiles.newPartitionData(spec) :
null;
}
@@ -89,6 +81,7 @@ class FileMetadata {
public Builder copy(DeleteFile toCopy) {
if (isPartitioned) {
+ Preconditions.checkState(specId == toCopy.specId(), "Cannot copy a
DeleteFile with a different spec");
this.partitionData = DataFiles.copyPartitionData(spec,
toCopy.partition(), partitionData);
}
this.content = toCopy.content();
@@ -208,7 +201,7 @@ class FileMetadata {
Preconditions.checkArgument(recordCount >= 0, "Record count is
required");
return new GenericDeleteFile(
- content, filePath, format, isPartitioned ? DataFiles.copy(spec,
partitionData) : null,
+ specId, content, filePath, format, isPartitioned ?
DataFiles.copy(spec, partitionData) : null,
fileSizeInBytes, new Metrics(
recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds,
upperBounds),
keyMetadata);
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 1871790..1075c63 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -34,26 +34,10 @@ class GenericDataFile extends BaseFile<DataFile> implements
DataFile {
super(avroSchema);
}
- GenericDataFile(String filePath, FileFormat format, long recordCount,
- long fileSizeInBytes) {
- this(filePath, format, null, recordCount, fileSizeInBytes);
- }
-
- GenericDataFile(String filePath, FileFormat format, PartitionData partition,
- long recordCount, long fileSizeInBytes) {
- super(FileContent.DATA, filePath, format, partition, fileSizeInBytes,
recordCount,
- null, null, null, null, null, null, null);
- }
-
- GenericDataFile(String filePath, FileFormat format, PartitionData partition,
- long fileSizeInBytes, Metrics metrics, List<Long>
splitOffsets) {
- this(filePath, format, partition, fileSizeInBytes, metrics, null,
splitOffsets);
- }
-
- GenericDataFile(String filePath, FileFormat format, PartitionData partition,
+ GenericDataFile(int specId, String filePath, FileFormat format,
PartitionData partition,
long fileSizeInBytes, Metrics metrics,
ByteBuffer keyMetadata, List<Long> splitOffsets) {
- super(FileContent.DATA, filePath, format, partition, fileSizeInBytes,
metrics.recordCount(),
+ super(specId, FileContent.DATA, filePath, format, partition,
fileSizeInBytes, metrics.recordCount(),
metrics.columnSizes(), metrics.valueCounts(),
metrics.nullValueCounts(),
metrics.lowerBounds(), metrics.upperBounds(), splitOffsets,
keyMetadata);
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
index 6f9b637..0ad76a9 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
@@ -34,9 +34,9 @@ class GenericDeleteFile extends BaseFile<DeleteFile>
implements DeleteFile {
super(avroSchema);
}
- GenericDeleteFile(FileContent content, String filePath, FileFormat format,
PartitionData partition,
+ GenericDeleteFile(int specId, FileContent content, String filePath,
FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics, ByteBuffer
keyMetadata) {
- super(content, filePath, format, partition, fileSizeInBytes,
metrics.recordCount(),
+ super(specId, content, filePath, format, partition, fileSizeInBytes,
metrics.recordCount(),
metrics.columnSizes(), metrics.valueCounts(),
metrics.nullValueCounts(),
metrics.lowerBounds(), metrics.upperBounds(), null, keyMetadata);
}
diff --git
a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
index d8ceb40..ad396ea 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
+++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
@@ -34,7 +34,7 @@ class InheritableMetadataFactory {
static InheritableMetadata fromManifest(ManifestFile manifest) {
Preconditions.checkArgument(manifest.snapshotId() != null,
"Cannot read from ManifestFile with null (unassigned) snapshot ID");
- return new BaseInheritableMetadata(manifest.snapshotId(),
manifest.sequenceNumber());
+ return new BaseInheritableMetadata(manifest.partitionSpecId(),
manifest.snapshotId(), manifest.sequenceNumber());
}
static InheritableMetadata forCopy(long snapshotId) {
@@ -42,16 +42,22 @@ class InheritableMetadataFactory {
}
static class BaseInheritableMetadata implements InheritableMetadata {
+ private final int specId;
private final long snapshotId;
private final long sequenceNumber;
- private BaseInheritableMetadata(long snapshotId, long sequenceNumber) {
+ private BaseInheritableMetadata(int specId, long snapshotId, long
sequenceNumber) {
+ this.specId = specId;
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
}
@Override
public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F>
manifestEntry) {
+ if (manifestEntry.file() instanceof BaseFile) {
+ BaseFile<?> file = (BaseFile<?>) manifestEntry.file();
+ file.setSpecId(specId);
+ }
if (manifestEntry.snapshotId() == null) {
manifestEntry.setSnapshotId(snapshotId);
}
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
index 11ce156..24bff01 100644
--- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -42,7 +42,7 @@ class StaticDataTask implements DataTask {
private final StructLike[] rows;
private StaticDataTask(InputFile metadata, StructLike[] rows) {
- this.metadataFile = DataFiles.builder()
+ this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(metadata)
.withRecordCount(rows.length)
.withFormat(FileFormat.METADATA)
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java
b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index 5615ee1..cb6c922 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -365,6 +365,11 @@ class V1Metadata {
}
@Override
+ public int specId() {
+ return wrapped.specId();
+ }
+
+ @Override
public FileContent content() {
return wrapped.content();
}
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java
b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index c3d3d5e..c499950 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -413,6 +413,11 @@ class V2Metadata {
}
@Override
+ public int specId() {
+ return wrapped.specId();
+ }
+
+ @Override
public FileContent content() {
return wrapped.content();
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 32e2d9a..1621539 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -68,10 +68,10 @@ public class TestManifestWriterVersions {
private static final List<Long> OFFSETS = ImmutableList.of(4L);
private static final DataFile DATA_FILE = new GenericDataFile(
- PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS);
+ 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS);
private static final DeleteFile DELETE_FILE = new GenericDeleteFile(
- FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS,
null);
+ 0, FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L,
METRICS, null);
@Rule
public TemporaryFolder temp = new TemporaryFolder();
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index e88f9bf..dc47802 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -681,13 +681,15 @@ public class TestMergeAppend extends TableTestBase {
V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals("Table should end with last-sequence-number 0", 0,
readMetadata().lastSequenceNumber());
- DataFile newFileC = DataFiles.builder(newSpec)
- .copy(FILE_C)
+ DataFile newFileY = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-y.parquet")
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=3")
+ .withRecordCount(1)
.build();
table.newAppend()
- .appendFile(newFileC)
+ .appendFile(newFileY)
.commit();
Snapshot lastSnapshot = table.currentSnapshot();
@@ -702,7 +704,7 @@ public class TestMergeAppend extends TableTestBase {
validateManifest(lastSnapshot.allManifests().get(0),
seqs(2),
ids(lastSnapshot.snapshotId()),
- files(newFileC),
+ files(newFileY),
statuses(Status.ADDED)
);
@@ -747,13 +749,15 @@ public class TestMergeAppend extends TableTestBase {
V2Assert.assertEquals("Last sequence number should be 2", 2,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals("Table should end with last-sequence-number 0", 0,
readMetadata().lastSequenceNumber());
- DataFile newFileC = DataFiles.builder(newSpec)
- .copy(FILE_C)
+ DataFile newFileY = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-y.parquet")
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=3")
+ .withRecordCount(1)
.build();
table.newAppend()
- .appendFile(newFileC)
+ .appendFile(newFileY)
.commit();
Snapshot lastSnapshot = table.currentSnapshot();
V2Assert.assertEquals("Snapshot sequence number should be 3", 3,
lastSnapshot.sequenceNumber());
@@ -768,7 +772,7 @@ public class TestMergeAppend extends TableTestBase {
validateManifest(lastSnapshot.allManifests().get(0),
seqs(3),
ids(lastSnapshot.snapshotId()),
- files(newFileC),
+ files(newFileY),
statuses(Status.ADDED)
);
validateManifest(lastSnapshot.allManifests().get(1),
@@ -1129,8 +1133,12 @@ public class TestMergeAppend extends TableTestBase {
V2Assert.assertEquals("Last sequence number should be 1", 1,
readMetadata().lastSequenceNumber());
V1Assert.assertEquals("Table should end with last-sequence-number 0", 0,
readMetadata().lastSequenceNumber());
+ // create a new with the table's current spec
DataFile newFile = DataFiles.builder(table.spec())
- .copy(FILE_B)
+ .withPath("/path/to/data-x.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("id_bucket=1/data_bucket=1")
+ .withRecordCount(1)
.build();
table.newAppend()
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 7a2c5fb..5c179ee 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -579,22 +579,26 @@ public class TestRewriteManifests extends TableTestBase {
// commit the new partition spec to the table manually
table.ops().commit(base, base.updatePartitionSpec(newSpec));
- DataFile newFileC = DataFiles.builder(newSpec)
- .copy(FILE_C)
+ DataFile newFileY = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-y.parquet")
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=3")
+ .withRecordCount(1)
.build();
table.newAppend()
- .appendFile(newFileC)
+ .appendFile(newFileY)
.commit();
- DataFile newFileD = DataFiles.builder(newSpec)
- .copy(FILE_D)
+ DataFile newFileZ = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-z.parquet")
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=4")
+ .withRecordCount(1)
.build();
table.newAppend()
- .appendFile(newFileD)
+ .appendFile(newFileZ)
.commit();
Assert.assertEquals("Should use 3 manifest files",
@@ -648,22 +652,26 @@ public class TestRewriteManifests extends TableTestBase {
// commit the new partition spec to the table manually
table.ops().commit(base, base.updatePartitionSpec(newSpec));
- DataFile newFileC = DataFiles.builder(newSpec)
- .copy(FILE_C)
+ DataFile newFileY = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-y.parquet")
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=3")
+ .withRecordCount(1)
.build();
table.newAppend()
- .appendFile(newFileC)
+ .appendFile(newFileY)
.commit();
- DataFile newFileD = DataFiles.builder(newSpec)
- .copy(FILE_D)
+ DataFile newFileZ = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-z.parquet")
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2/id_bucket=4")
+ .withRecordCount(1)
.build();
table.newAppend()
- .appendFile(newFileD)
+ .appendFile(newFileZ)
.commit();
Assert.assertEquals("Rewrite manifests should produce 3 manifest files",
diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
index 10ace67..5acd2a3 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
@@ -64,7 +64,6 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.apache.iceberg.DataFiles.fromInputFile;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
@@ -255,7 +254,11 @@ public class TestLocalScan {
}
writeFile(location.toString(), format.addExtension("file-" + fileNum),
records);
- append.appendFile(fromInputFile(HadoopInputFile.fromPath(path, CONF),
numRecords));
+ DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withRecordCount(numRecords)
+ .withInputFile(HadoopInputFile.fromPath(path, CONF))
+ .build();
+ append.appendFile(file);
fileNum += 1;
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index fe7c004..48dd001 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -88,6 +88,11 @@ public class SparkDataFile implements DataFile {
}
@Override
+ public int specId() {
+ return -1;
+ }
+
+ @Override
public CharSequence path() {
return wrapped.getAs(filePathPosition);
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index c46b191..3a60167 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -265,7 +265,10 @@ public abstract class TestPartitionValues {
// add the Avro data file to the source table
source.newAppend()
- .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10))
+ .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+ .withRecordCount(10)
+ .withInputFile(Files.localInput(avroData))
+ .build())
.commit();
Dataset<Row> sourceDF = spark.read().format("iceberg")
@@ -330,7 +333,10 @@ public abstract class TestPartitionValues {
// add the Avro data file to the source table
source.newAppend()
- .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10))
+ .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
+ .withRecordCount(10)
+ .withInputFile(Files.localInput(avroData))
+ .build())
.commit();
Dataset<Row> sourceDF = spark.read().format("iceberg")