This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 457a1f1991 Add InMemoryFileIO as a test helper class (#6538)
457a1f1991 is described below
commit 457a1f1991a1d01faa074796cc476f6f0b7846cd
Author: Dennis Huo <[email protected]>
AuthorDate: Mon Jan 9 13:27:10 2023 -0800
Add InMemoryFileIO as a test helper class (#6538)
* Add InMemoryFileIO alongside existing InMemoryOutputFile and
InMemoryInputFile as a test helper class stitching the two together
and maintaining an in-memory listing of files. Add a dedicated
unittest for the new test helper.
* Update variable naming for consistency, refactor to avoid using
containsKey
* Use Maps.newConcurrentMap instead of Maps.newHashMap
---
.../java/org/apache/iceberg/io/InMemoryFileIO.java | 72 +++++++++++++
.../org/apache/iceberg/io/InMemoryOutputFile.java | 26 ++++-
.../org/apache/iceberg/io/TestInMemoryFileIO.java | 111 +++++++++++++++++++++
3 files changed, 207 insertions(+), 2 deletions(-)
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
b/core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
new file mode 100644
index 0000000000..41756043ff
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.Map;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class InMemoryFileIO implements FileIO {
+
+ private Map<String, byte[]> inMemoryFiles = Maps.newConcurrentMap();
+ private boolean closed = false;
+
+ public void addFile(String location, byte[] contents) {
+ Preconditions.checkState(!closed, "Cannot call addFile after calling
close()");
+ inMemoryFiles.put(location, contents);
+ }
+
+ public boolean fileExists(String location) {
+ return inMemoryFiles.containsKey(location);
+ }
+
+ @Override
+ public InputFile newInputFile(String location) {
+ Preconditions.checkState(!closed, "Cannot call newInputFile after calling
close()");
+ byte[] contents = inMemoryFiles.get(location);
+ if (null == contents) {
+ throw new NotFoundException("No in-memory file found for location: %s",
location);
+ }
+ return new InMemoryInputFile(location, contents);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String location) {
+ Preconditions.checkState(!closed, "Cannot call newOutputFile after calling
close()");
+ return new InMemoryOutputFile(location, this);
+ }
+
+ @Override
+ public void deleteFile(String location) {
+ Preconditions.checkState(!closed, "Cannot call deleteFile after calling
close()");
+ if (null == inMemoryFiles.remove(location)) {
+ throw new NotFoundException("No in-memory file found for location: %s",
location);
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
index e8740b125f..5d72cef622 100644
--- a/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
+++ b/core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
@@ -30,19 +30,38 @@ public class InMemoryOutputFile implements OutputFile {
private boolean exists = false;
private ByteArrayOutputStream contents;
+ private InMemoryFileIO parentFileIO;
public InMemoryOutputFile() {
this("memory:" + UUID.randomUUID());
}
public InMemoryOutputFile(String location) {
+ this(location, null);
+ }
+
+ /**
+ * If the optional parentFileIO is provided, file-existence behaves
similarly to S3FileIO;
+ * existence checks are performed up-front if creating without overwrite,
but files only exist in
+ * the parentFileIO if close() has been called on the associated output
streams (or pre-existing
+ * files are populated into the parentFileIO through other means).
+ *
+ * @param location the location returned by location() of this OutputFile,
the InputFile obtained
+ * from calling toInputFile(), and the location for looking up the
associated InputFile from a
+ * parentFileIO, if non-null.
+ * @param parentFileIO if non-null, commits an associated InMemoryInputFile
on close() into the
+ * parentFileIO, and uses the parentFileIO for "already exists" checks
if creating without
+ * overwriting.
+ */
+ public InMemoryOutputFile(String location, InMemoryFileIO parentFileIO) {
Preconditions.checkNotNull(location, "location is null");
this.location = location;
+ this.parentFileIO = parentFileIO;
}
@Override
public PositionOutputStream create() {
- if (exists) {
+ if (exists || (parentFileIO != null && parentFileIO.fileExists(location)))
{
throw new AlreadyExistsException("Already exists");
}
return createOrOverwrite();
@@ -70,7 +89,7 @@ public class InMemoryOutputFile implements OutputFile {
return contents.toByteArray();
}
- private static class InMemoryPositionOutputStream extends
PositionOutputStream {
+ private class InMemoryPositionOutputStream extends PositionOutputStream {
private final ByteArrayOutputStream delegate;
private boolean closed = false;
@@ -112,6 +131,9 @@ public class InMemoryOutputFile implements OutputFile {
public void close() throws IOException {
delegate.close();
closed = true;
+ if (parentFileIO != null) {
+ parentFileIO.addFile(location(), toByteArray());
+ }
}
private void checkOpen() {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
b/core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
new file mode 100644
index 0000000000..95118ec7d1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestInMemoryFileIO {
+ String location = "s3://foo/bar.txt";
+
+ @Test
+ public void testBasicEndToEnd() throws IOException {
+ InMemoryFileIO fileIO = new InMemoryFileIO();
+ Assertions.assertThat(fileIO.fileExists(location)).isFalse();
+
+ OutputStream outputStream = fileIO.newOutputFile(location).create();
+ byte[] data = "hello world".getBytes();
+ outputStream.write(data);
+ outputStream.close();
+ Assertions.assertThat(fileIO.fileExists(location)).isTrue();
+
+ InputStream inputStream = fileIO.newInputFile(location).newStream();
+ byte[] buf = new byte[data.length];
+ inputStream.read(buf);
+ inputStream.close();
+ Assertions.assertThat(new String(buf)).isEqualTo("hello world");
+
+ fileIO.deleteFile(location);
+ Assertions.assertThat(fileIO.fileExists(location)).isFalse();
+ }
+
+ @Test
+ public void testNewInputFileNotFound() throws IOException {
+ InMemoryFileIO fileIO = new InMemoryFileIO();
+ Assertions.assertThatExceptionOfType(NotFoundException.class)
+ .isThrownBy(() -> fileIO.newInputFile("s3://nonexistent/file"));
+ }
+
+ @Test
+ public void testDeleteFileNotFound() throws IOException {
+ InMemoryFileIO fileIO = new InMemoryFileIO();
+ Assertions.assertThatExceptionOfType(NotFoundException.class)
+ .isThrownBy(() -> fileIO.deleteFile("s3://nonexistent/file"));
+ }
+
+ @Test
+ public void testCreateNoOverwrite() throws IOException {
+ InMemoryFileIO fileIO = new InMemoryFileIO();
+ fileIO.addFile(location, "hello world".getBytes());
+ Assertions.assertThatExceptionOfType(AlreadyExistsException.class)
+ .isThrownBy(() -> fileIO.newOutputFile(location).create());
+ }
+
+ @Test
+ public void testOverwriteBeforeAndAfterClose() throws IOException {
+ byte[] oldData = "old data".getBytes();
+ byte[] newData = "new data".getBytes();
+
+ InMemoryFileIO fileIO = new InMemoryFileIO();
+ OutputStream outputStream = fileIO.newOutputFile(location).create();
+ outputStream.write(oldData);
+
+ // Even though we've called create() and started writing data, this file
won't yet exist
+ // in the parentFileIO before we've closed it.
+ Assertions.assertThat(fileIO.fileExists(location)).isFalse();
+
+ // File appears after closing it.
+ outputStream.close();
+ Assertions.assertThat(fileIO.fileExists(location)).isTrue();
+
+ // Start a new OutputFile and write new data but don't close() it yet.
+ outputStream = fileIO.newOutputFile(location).createOrOverwrite();
+ outputStream.write(newData);
+
+ // We'll still read old data.
+ InputStream inputStream = fileIO.newInputFile(location).newStream();
+ byte[] buf = new byte[oldData.length];
+ inputStream.read(buf);
+ inputStream.close();
+ Assertions.assertThat(new String(buf)).isEqualTo("old data");
+
+ // Finally, close the new output stream; data should be overwritten with
new data now.
+ outputStream.close();
+ inputStream = fileIO.newInputFile(location).newStream();
+ buf = new byte[newData.length];
+ inputStream.read(buf);
+ inputStream.close();
+ Assertions.assertThat(new String(buf)).isEqualTo("new data");
+ }
+}