timoninmaxim commented on code in PR #11040:
URL: https://github.com/apache/ignite/pull/11040#discussion_r1399654643
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -2205,6 +2208,7 @@ public IgniteFutureImpl<Void> createSnapshot(
* @param incremental Incremental snapshot flag.
* @param onlyPrimary If {@code true} snapshot only primary copies of
partitions.
* @param dump If {@code true} cache dump must be created.
+ * @param compress If {@code true} then compress the file.
Review Comment:
Which "the file"?
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java:
##########
@@ -63,6 +63,14 @@ public class SnapshotMetadata implements Serializable {
*/
private final String folderName;
+ /**
+ * If {@code true} then compress the files.
+ * This shouldn't be confused with {@link SnapshotMetadata#comprGrpIds}
which represents how Ignite keeps data in memory pages
+ * while {@link SnapshotMetadata#comprParts} represents how the dump files
stored onto disk.
+ */
+ @SuppressWarnings("JavadocDeclaration")
Review Comment:
Why do you need this annotation?
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java:
##########
@@ -63,6 +63,14 @@ public class SnapshotMetadata implements Serializable {
*/
private final String folderName;
+ /**
+ * If {@code true} then compress the files.
Review Comment:
"compress partition files".
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java:
##########
@@ -183,6 +194,13 @@ public String folderName() {
return folderName;
}
+ /**
+ * @return {@code true} if compress the files.
Review Comment:
"compress partition files"
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java:
##########
@@ -94,6 +94,9 @@ public class SnapshotOperationRequest implements Serializable
{
/** If {@code true} then create dump. */
private final boolean dump;
+ /** If {@code true} then compress the file. */
Review Comment:
the same
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java:
##########
@@ -222,32 +224,41 @@ public List<StoredCacheData> configs(String node, int
group) {
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if partition is compressed.
* @return Dump iterator.
*/
- public List<Integer> partitions(String node, int group) {
+ public List<Integer> partitions(String node, int group, boolean
compressed) {
File[] parts = dumpGroupDirectory(node, group)
- .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
f.getName().endsWith(DUMP_FILE_EXT));
+ .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
+ (f.getName().endsWith(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX
: DUMP_FILE_EXT))
+ );
if (parts == null)
return Collections.emptyList();
return Arrays.stream(parts)
- .map(partFile ->
Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX,
"").replace(DUMP_FILE_EXT, "")))
+ .map(partFile -> Integer.parseInt(partFile.getName()
+ .replace(PART_FILE_PREFIX, "")
+ .replace(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX :
DUMP_FILE_EXT, "")
+ ))
.collect(Collectors.toList());
}
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if the pertition is compressed.
* @return Dump iterator.
*/
- public DumpedPartitionIterator iterator(String node, int group, int part) {
- FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+ public DumpedPartitionIterator iterator(String node, int group, int part,
boolean compressed) {
Review Comment:
Can exract the flag from metadata
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java:
##########
@@ -222,32 +224,41 @@ public List<StoredCacheData> configs(String node, int
group) {
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if partition is compressed.
* @return Dump iterator.
*/
- public List<Integer> partitions(String node, int group) {
+ public List<Integer> partitions(String node, int group, boolean
compressed) {
File[] parts = dumpGroupDirectory(node, group)
- .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
f.getName().endsWith(DUMP_FILE_EXT));
+ .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
+ (f.getName().endsWith(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX
: DUMP_FILE_EXT))
+ );
if (parts == null)
return Collections.emptyList();
return Arrays.stream(parts)
- .map(partFile ->
Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX,
"").replace(DUMP_FILE_EXT, "")))
+ .map(partFile -> Integer.parseInt(partFile.getName()
+ .replace(PART_FILE_PREFIX, "")
+ .replace(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX :
DUMP_FILE_EXT, "")
+ ))
.collect(Collectors.toList());
}
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if the pertition is compressed.
* @return Dump iterator.
*/
- public DumpedPartitionIterator iterator(String node, int group, int part) {
- FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+ public DumpedPartitionIterator iterator(String node, int group, int part,
boolean compressed) {
+ FileIOFactory ioFactory = compressed ? new UnzipFileIOFactory() : new
RandomAccessFileIOFactory();
FileIO dumpFile;
try {
- dumpFile = ioFactory.create(new File(dumpGroupDirectory(node,
group), PART_FILE_PREFIX + part + DUMP_FILE_EXT));
+ dumpFile = ioFactory.create(new File(dumpGroupDirectory(node,
group),
+ PART_FILE_PREFIX + part + DUMP_FILE_EXT + (compressed ?
ZIP_SUFFIX : "")
Review Comment:
let's make a method that returns the name of the file. There is at least two
places where this code is written
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java:
##########
@@ -222,32 +224,41 @@ public List<StoredCacheData> configs(String node, int
group) {
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if partition is compressed.
* @return Dump iterator.
*/
- public List<Integer> partitions(String node, int group) {
+ public List<Integer> partitions(String node, int group, boolean
compressed) {
File[] parts = dumpGroupDirectory(node, group)
- .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
f.getName().endsWith(DUMP_FILE_EXT));
+ .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
+ (f.getName().endsWith(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX
: DUMP_FILE_EXT))
Review Comment:
Code duplication within this method. Let's use a var `suffix` in both cases.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIO.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import
org.apache.ignite.internal.processors.cache.persistence.file.AbstractFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
+
+/**
+ * {@link FileIO} that allows to write ZIP compressed file.
+ * It doesn't support reading or random access.
+ * It is not designed for writing concurrently from several threads.
+ */
+public class WriteOnlyZipFileIO extends AbstractFileIO {
+ /** Buffer size */
+ private static final int BUFFER_SIZE = 128 * 1024;
+
+ /** */
+ private final ZipOutputStream zos;
+
+ /** */
+ private final WritableByteChannel ch;
+
+ /** */
+ private long pos;
+
+ /** */
+ public WriteOnlyZipFileIO(File file) throws IOException {
+ A.ensure(file.getName().endsWith(ZIP_SUFFIX), "File name should end
with " + ZIP_SUFFIX);
+
+ String entryName = file.getName().substring(0, file.getName().length()
- ZIP_SUFFIX.length());
+
+ zos = new ZipOutputStream(new
BufferedOutputStream(Files.newOutputStream(Paths.get(file.getPath())),
BUFFER_SIZE));
+
+ zos.setLevel(9);
Review Comment:
magic variable
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java:
##########
@@ -105,6 +108,7 @@ public class SnapshotOperationRequest implements
Serializable {
* @param incIdx Incremental snapshot index.
* @param onlyPrimary If {@code true} snapshot only primary copies of
partitions.
* @param dump If {@code true} then create dump.
+ * @param compress If {@code true} then compress the file.
Review Comment:
the same
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java:
##########
@@ -222,32 +224,41 @@ public List<StoredCacheData> configs(String node, int
group) {
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if partition is compressed.
* @return Dump iterator.
*/
- public List<Integer> partitions(String node, int group) {
+ public List<Integer> partitions(String node, int group, boolean
compressed) {
Review Comment:
Can extract `compressed` from metadata
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java:
##########
@@ -109,6 +117,7 @@ public class SnapshotMetadata implements Serializable {
* @param snpName Snapshot name.
* @param consId Consistent id of a node to which this metadata relates.
* @param folderName Directory name which stores the data files.
+ * @param comprParts If {@code true} then zip the files.
Review Comment:
"compress partition files"
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java:
##########
@@ -207,6 +213,11 @@ public boolean dump() {
return dump;
}
+ /** @return If {@code true} then compress the file. */
Review Comment:
the same
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java:
##########
@@ -183,6 +194,13 @@ public String folderName() {
return folderName;
}
+ /**
+ * @return {@code true} if compress the files.
+ */
+ public boolean comprParts() {
Review Comment:
`compressPartitions()`
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java:
##########
@@ -222,32 +224,41 @@ public List<StoredCacheData> configs(String node, int
group) {
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if partition is compressed.
* @return Dump iterator.
*/
- public List<Integer> partitions(String node, int group) {
+ public List<Integer> partitions(String node, int group, boolean
compressed) {
File[] parts = dumpGroupDirectory(node, group)
- .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
f.getName().endsWith(DUMP_FILE_EXT));
+ .listFiles(f -> f.getName().startsWith(PART_FILE_PREFIX) &&
+ (f.getName().endsWith(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX
: DUMP_FILE_EXT))
+ );
if (parts == null)
return Collections.emptyList();
return Arrays.stream(parts)
- .map(partFile ->
Integer.parseInt(partFile.getName().replace(PART_FILE_PREFIX,
"").replace(DUMP_FILE_EXT, "")))
+ .map(partFile -> Integer.parseInt(partFile.getName()
+ .replace(PART_FILE_PREFIX, "")
+ .replace(compressed ? DUMP_FILE_EXT + ZIP_SUFFIX :
DUMP_FILE_EXT, "")
+ ))
.collect(Collectors.toList());
}
/**
* @param node Node directory name.
* @param group Group id.
+ * @param compressed {@code true} if the pertition is compressed.
Review Comment:
missprint in "pertition"
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -141,7 +147,8 @@ public CreateDumpFutureTask(
);
this.dumpDir = dumpDir;
- this.ioFactory = ioFactory;
+ this.ioFactory = compress ? new WriteOnlyZipFileIOFactory() :
ioFactory;
Review Comment:
It means that we try to zip every entry? I suppose that compression block of
the data (that contains at least some keys) should be much efficient. What do
you think if compress the final file instead of every entry?
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/WriteOnlyZipFileIO.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import
org.apache.ignite.internal.processors.cache.persistence.file.AbstractFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
+
+/**
+ * {@link FileIO} that allows to write ZIP compressed file.
+ * It doesn't support reading or random access.
+ * It is not designed for writing concurrently from several threads.
+ */
+public class WriteOnlyZipFileIO extends AbstractFileIO {
+ /** Buffer size */
+ private static final int BUFFER_SIZE = 128 * 1024;
+
+ /** */
+ private final ZipOutputStream zos;
+
+ /** */
+ private final WritableByteChannel ch;
+
+ /** */
+ private long pos;
+
+ /** */
+ public WriteOnlyZipFileIO(File file) throws IOException {
+ A.ensure(file.getName().endsWith(ZIP_SUFFIX), "File name should end
with " + ZIP_SUFFIX);
+
+ String entryName = file.getName().substring(0, file.getName().length()
- ZIP_SUFFIX.length());
+
+ zos = new ZipOutputStream(new
BufferedOutputStream(Files.newOutputStream(Paths.get(file.getPath())),
BUFFER_SIZE));
+
+ zos.setLevel(9);
Review Comment:
Why BEST_COMPRESSION (9), and not BEST_SPEED (1)? See
`DataStorageConfiguration#DFLT_WAL_COMPACTION_LEVEL`. I suppose that speed
first, but it should be configurable the same way as WAL compression does.
Let's discuss it with @nizhikov
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]