This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 3e641b2530e [HUDI-6090] Optimise payload size for list of FileGroupDTO (#8480) 3e641b2530e is described below commit 3e641b2530e1e6ba2428a62229be18c9b44c8112 Author: Lokesh Jain <lj...@apache.org> AuthorDate: Wed Apr 26 00:42:18 2023 +0530 [HUDI-6090] Optimise payload size for list of FileGroupDTO (#8480) - Optimise FileGroupDTO - avoid sending Timeline with every file group --------- Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../hudi/common/table/timeline/dto/DTOUtils.java | 62 ++++++++++++++++++++++ .../common/table/timeline/dto/FileGroupDTO.java | 15 +++--- .../view/RemoteHoodieTableFileSystemView.java | 9 ++-- .../hudi/timeline/service/RequestHandler.java | 2 +- .../service/handlers/FileSliceHandler.java | 14 +++-- .../TestRemoteHoodieTableFileSystemView.java | 54 +++++++++++++++++++ 6 files changed, 140 insertions(+), 16 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java new file mode 100644 index 00000000000..ef5a8869487 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline.dto; + +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +/** + * DTO utils to hold batch apis. + */ +public class DTOUtils { + + public static List<FileGroupDTO> fileGroupDTOsfromFileGroups(List<HoodieFileGroup> fileGroups) { + if (fileGroups.isEmpty()) { + return Collections.emptyList(); + } else if (fileGroups.size() == 1) { + return Collections.singletonList(FileGroupDTO.fromFileGroup(fileGroups.get(0), true)); + } else { + List<FileGroupDTO> fileGroupDTOS = new ArrayList<>(); + fileGroupDTOS.add(FileGroupDTO.fromFileGroup(fileGroups.get(0), true)); + fileGroupDTOS.addAll(fileGroups.subList(1, fileGroups.size()).stream() + .map(fg -> FileGroupDTO.fromFileGroup(fg, false)).collect(Collectors.toList())); + return fileGroupDTOS; + } + } + + public static Stream<HoodieFileGroup> fileGroupDTOsToFileGroups(List<FileGroupDTO> dtos, HoodieTableMetaClient metaClient) { + if (dtos.isEmpty()) { + return Stream.empty(); + } + + // Timeline exists only in the first file group DTO. Optimisation to reduce payload size. + checkState(dtos.get(0).timeline != null, "Timeline is expected to be set for the first FileGroupDTO"); + HoodieTimeline timeline = TimelineDTO.toTimeline(dtos.get(0).timeline, metaClient); + return dtos.stream().map(dto -> FileGroupDTO.toFileGroup(dto, timeline)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java index dfbd40126c0..bc5cbdb8022 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java @@ -19,7 +19,7 @@ package org.apache.hudi.common.table.timeline.dto; import org.apache.hudi.common.model.HoodieFileGroup; -import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @@ -45,19 +45,20 @@ public class FileGroupDTO { @JsonProperty("timeline") TimelineDTO timeline; - public static FileGroupDTO fromFileGroup(HoodieFileGroup fileGroup) { + public static FileGroupDTO fromFileGroup(HoodieFileGroup fileGroup, boolean includeTimeline) { FileGroupDTO dto = new FileGroupDTO(); dto.partition = fileGroup.getPartitionPath(); dto.id = fileGroup.getFileGroupId().getFileId(); dto.slices = fileGroup.getAllRawFileSlices().map(FileSliceDTO::fromFileSlice).collect(Collectors.toList()); - dto.timeline = TimelineDTO.fromTimeline(fileGroup.getTimeline()); + if (includeTimeline) { + dto.timeline = TimelineDTO.fromTimeline(fileGroup.getTimeline()); + } return dto; } - public static HoodieFileGroup toFileGroup(FileGroupDTO dto, HoodieTableMetaClient metaClient) { - HoodieFileGroup fileGroup = - new HoodieFileGroup(dto.partition, dto.id, TimelineDTO.toTimeline(dto.timeline, metaClient)); - dto.slices.stream().map(FileSliceDTO::toFileSlice).forEach(fileSlice -> fileGroup.addFileSlice(fileSlice)); + public static HoodieFileGroup toFileGroup(FileGroupDTO dto, HoodieTimeline fgTimeline) { + HoodieFileGroup fileGroup = new HoodieFileGroup(dto.partition, dto.id, fgTimeline); + dto.slices.stream().map(FileSliceDTO::toFileSlice).forEach(fileGroup::addFileSlice); return fileGroup; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index e5895c84695..c1772db6bfc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.dto.BaseFileDTO; import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO; import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO; +import org.apache.hudi.common.table.timeline.dto.DTOUtils; import org.apache.hudi.common.table.timeline.dto.FileGroupDTO; import org.apache.hudi.common.table.timeline.dto.FileSliceDTO; import org.apache.hudi.common.table.timeline.dto.InstantDTO; @@ -398,7 +399,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, try { List<FileGroupDTO> fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap, new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); - return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient)); + return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); } @@ -410,7 +411,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, try { List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap, new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); - return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient)); + return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); } @@ -422,7 +423,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, try { List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap, new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); - return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient)); + return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); } @@ -434,7 +435,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, try { List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap, new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); - return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient)); + return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index aabec3d57a4..9f18cc3453b 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -160,7 +160,7 @@ public class RequestHandler { if (LOG.isDebugEnabled()) { LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient + "], localTimeline=" + localTimeline.getInstants()); - } + } if ((!localTimeline.getInstantsAsStream().findAny().isPresent()) && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java index caf1e3c9399..44e12855ac7 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java @@ -21,8 +21,10 @@ package org.apache.hudi.timeline.service.handlers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO; import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO; +import org.apache.hudi.common.table.timeline.dto.DTOUtils; import org.apache.hudi.common.table.timeline.dto.FileGroupDTO; import org.apache.hudi.common.table.timeline.dto.FileSliceDTO; import org.apache.hudi.common.table.view.FileSystemViewManager; @@ -95,23 +97,27 @@ public class FileSliceHandler extends Handler { } public List<FileGroupDTO> getAllFileGroups(String basePath, String partitionPath) { - return viewManager.getFileSystemView(basePath).getAllFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup) + List<HoodieFileGroup> fileGroups = viewManager.getFileSystemView(basePath).getAllFileGroups(partitionPath) .collect(Collectors.toList()); + return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); } public List<FileGroupDTO> getReplacedFileGroupsBeforeOrOn(String basePath, String maxCommitTime, String partitionPath) { - return viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime, partitionPath).map(FileGroupDTO::fromFileGroup) + List<HoodieFileGroup> fileGroups = viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime, partitionPath) .collect(Collectors.toList()); + return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); } public List<FileGroupDTO> getReplacedFileGroupsBefore(String basePath, String maxCommitTime, String partitionPath) { - return viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime, partitionPath).map(FileGroupDTO::fromFileGroup) + List<HoodieFileGroup> fileGroups = viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime, partitionPath) .collect(Collectors.toList()); + return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); } public List<FileGroupDTO> getAllReplacedFileGroups(String basePath, String partitionPath) { - return viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup) + List<HoodieFileGroup> fileGroups = viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath) .collect(Collectors.toList()); + return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); } public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String basePath) { diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index 0836a7558a2..c9a103e5264 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -21,23 +21,37 @@ package org.apache.hudi.timeline.service.functional; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.dto.DTOUtils; +import org.apache.hudi.common.table.timeline.dto.FileGroupDTO; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView; +import org.apache.hudi.common.testutils.MockHoodieTimeline; import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.timeline.service.TimelineService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; /** @@ -124,4 +138,44 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst .forEach(t -> assertTrue(t.isDaemon())); server.close(); } + + @Test + public void testListFileGroupDTOPayload() throws IOException, NoSuchFieldException, IllegalAccessException { + ObjectMapper mapper = new ObjectMapper(); + List<HoodieFileGroup> fileGroups = new ArrayList<>(); + fileGroups.add(createHoodieFileGroup()); + fileGroups.add(createHoodieFileGroup()); + fileGroups.add(createHoodieFileGroup()); + + // Timeline exists only in the first file group DTO. Optimisation to reduce payload size. + Field timelineDTOField = FileGroupDTO.class.getDeclaredField("timeline"); + timelineDTOField.setAccessible(true); + List<FileGroupDTO> fileGroupDTOs = DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); + assertNotNull(timelineDTOField.get(fileGroupDTOs.get(0))); + // Verify other DTO objects do not contain timeline + assertNull(timelineDTOField.get(fileGroupDTOs.get(1))); + assertNull(timelineDTOField.get(fileGroupDTOs.get(2))); + + String prettyResult = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(fileGroupDTOs); + String normalResult = mapper.writeValueAsString(fileGroupDTOs); + + Stream<HoodieFileGroup> prettyFileGroups = readFileGroupStream(prettyResult, mapper); + Stream<HoodieFileGroup> normalFileGroups = readFileGroupStream(normalResult, mapper); + // FileGroupDTO.toFileGroup should make sure Timeline is repopulated to all the FileGroups + prettyFileGroups.forEach(g -> assertNotNull(g.getTimeline())); + normalFileGroups.forEach(g -> assertNotNull(g.getTimeline())); + } + + private Stream<HoodieFileGroup> readFileGroupStream(String result, ObjectMapper mapper) throws IOException { + return DTOUtils.fileGroupDTOsToFileGroups((List<FileGroupDTO>) mapper.readValue(result, new TypeReference<List<FileGroupDTO>>() {}), + metaClient); + } + + private HoodieFileGroup createHoodieFileGroup() { + Stream<String> completed = Stream.of("001"); + Stream<String> inflight = Stream.of("002"); + MockHoodieTimeline activeTimeline = new MockHoodieTimeline(completed, inflight); + return new HoodieFileGroup("", "data", + activeTimeline.getCommitsTimeline().filterCompletedInstants()); + } }