[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274220577 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java ## @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.TimelineDiffHelper; +import com.uber.hoodie.common.util.TimelineDiffHelper.TimelineDiffResult; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Adds the capability to incrementally sync the changes to file-system view as and when new instants gets completed. + */ +public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView { + + private static Logger log = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class); + + @Override + protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { +try { + TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline); + if (diffResult.canSyncIncrementally()) { +log.info("Doing incremental sync"); +runIncrementalSync(newTimeline, diffResult); +log.info("Finished incremental sync"); +// Reset timeline to latest +visibleActiveTimeline = newTimeline; + } else { +log.warn("Incremental Sync of timeline is deemed unsafe. Will revert to full syncing"); +super.runSync(oldTimeline, newTimeline); + } +} catch (Exception ioe) { + log.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe); + super.runSync(oldTimeline, newTimeline); +} + } + + /** + * Run incremental sync based on the diff result produced. + * + * @param timeline New Timeline + * @param diffResult Timeline Diff Result + */ + private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) { + +log.info("Timeline Diff Result is :" + diffResult); + +// First remove pending compaction instants which were completed +diffResult.getFinishedCompactionInstants().stream().forEach(instant -> { + try { +removePendingCompactionInstant(timeline, instant); + } catch (IOException e) { +throw new HoodieException(e); + } +}); + +// Add new completed instants found in the latest timeline +diffResult.getNewlySeenInstants().stream() +.filter(instant -> instant.isCompleted() || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) +.forEach(instant -> { + try { +if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) +|| instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + addCommitInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { + addRestoreInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) { + addCleanInstant(timeline, instant); +} else if
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274223360 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBSchemaHelper.java ## @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import java.util.Arrays; +import java.util.List; + +/** + * Helper class to generate Key and column names for rocksdb based view + */ +public class RocksDBSchemaHelper { Review comment: Can you please draw out the schema here ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274218850 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/FileSystemViewManager.java ## @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) Review comment: These can be changed to 2019 for all files... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274220242 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java ## @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.TimelineDiffHelper; +import com.uber.hoodie.common.util.TimelineDiffHelper.TimelineDiffResult; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Adds the capability to incrementally sync the changes to file-system view as and when new instants gets completed. + */ +public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView { + + private static Logger log = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class); + + @Override + protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { +try { + TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline); + if (diffResult.canSyncIncrementally()) { +log.info("Doing incremental sync"); +runIncrementalSync(newTimeline, diffResult); +log.info("Finished incremental sync"); +// Reset timeline to latest +visibleActiveTimeline = newTimeline; + } else { +log.warn("Incremental Sync of timeline is deemed unsafe. Will revert to full syncing"); +super.runSync(oldTimeline, newTimeline); + } +} catch (Exception ioe) { + log.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe); + super.runSync(oldTimeline, newTimeline); Review comment: Isn't this the same as line 66 ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274221323 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java ## @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.TimelineDiffHelper; +import com.uber.hoodie.common.util.TimelineDiffHelper.TimelineDiffResult; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Adds the capability to incrementally sync the changes to file-system view as and when new instants gets completed. + */ +public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView { + + private static Logger log = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class); + + @Override + protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { +try { + TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline); + if (diffResult.canSyncIncrementally()) { +log.info("Doing incremental sync"); +runIncrementalSync(newTimeline, diffResult); +log.info("Finished incremental sync"); +// Reset timeline to latest +visibleActiveTimeline = newTimeline; + } else { +log.warn("Incremental Sync of timeline is deemed unsafe. Will revert to full syncing"); +super.runSync(oldTimeline, newTimeline); + } +} catch (Exception ioe) { + log.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe); + super.runSync(oldTimeline, newTimeline); +} + } + + /** + * Run incremental sync based on the diff result produced. + * + * @param timeline New Timeline + * @param diffResult Timeline Diff Result + */ + private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) { + +log.info("Timeline Diff Result is :" + diffResult); + +// First remove pending compaction instants which were completed +diffResult.getFinishedCompactionInstants().stream().forEach(instant -> { + try { +removePendingCompactionInstant(timeline, instant); + } catch (IOException e) { +throw new HoodieException(e); + } +}); + +// Add new completed instants found in the latest timeline +diffResult.getNewlySeenInstants().stream() +.filter(instant -> instant.isCompleted() || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) +.forEach(instant -> { + try { +if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) +|| instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + addCommitInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { + addRestoreInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) { + addCleanInstant(timeline, instant); +} else if
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274219345 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/FileSystemViewManager.java ## @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.common.SerializableConfiguration; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.HoodieView; +import com.uber.hoodie.common.util.Functions.Function2; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * A container that can potentially hold one or more dataset's + * file-system views. There is one view for each dataset. This is a view built against a timeline containing completed + * actions. In an embedded timeline-server mode, this typically holds only one dataset's view. + * In a stand-alone server mode, this can hold more than one dataset's views. + */ +public class FileSystemViewManager { + private static Logger logger = LogManager.getLogger(FileSystemViewManager.class); + + private final SerializableConfiguration conf; + // The View Storage config used to store file-system views + private final FileSystemViewStorageConfig viewConfig; + // Map from Base-Path to View + private final ConcurrentHashMap globalViewMap; + // Factory Map to create file-system views + private final Function2 viewCreator; + + public FileSystemViewManager(SerializableConfiguration conf, FileSystemViewStorageConfig viewConfig, + Function2 viewCreator) { +this.conf = conf; +this.viewConfig = viewConfig; +this.globalViewMap = new ConcurrentHashMap<>(); +this.viewCreator = viewCreator; + } + + /** + * Drops reference to File-System Views. Future calls to view results in creating a new view + * @param basePath + */ + public void clearFileSystemView(String basePath) { +HoodieView view = globalViewMap.remove(basePath); +if (view != null) { + view.close(); +} + } + + /** + * Main API to get the file-system view for the base-path + * @param basePath + * @return + */ + public HoodieView getFileSystemView(String basePath) { +return globalViewMap.computeIfAbsent(basePath, +(path) -> viewCreator.apply(path, viewConfig)); + } + + /** + * Closes all views opened + */ + public void close() { +this.globalViewMap.values().stream().forEach(v -> v.close()); +this.globalViewMap.clear(); + } + + // FACTORY METHODS FOR CREATING FILE-SYSTEM VIEWS + + /** + * Create RocksDB based file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration + * @param basePath Base Path of dataset + * @return + */ + private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf, + FileSystemViewStorageConfig viewConf, String basePath) { +HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); +HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); +return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf); + } + + /** + * Create a spillable Map based file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration + * @param basePath Base Path of dataset + * @return + */ + private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, + FileSystemViewStorageConfig viewConf, String basePath) { +logger.warn("Creating SpillableMap based view for basePath " + basePath); +HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); +HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); +return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf); + } + + + /** + * Create an in-memory file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration +
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274224252 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java ## @@ -155,16 +166,39 @@ public R get(Object key) { if (entry == null) { return null; } +return get(entry); + } + + private void close(RandomAccessFile file) { +if (file != null) { + try { +file.close(); + } catch (IOException e) { +log.warn("Unable to close read pointer :", e); + } +} + } + + private R get(ValueMetadata entry) { +RandomAccessFile file = getNewRandomAccessFile(); +try { + return get(entry, file); +} finally { + close(file); +} + } + + public static R get(ValueMetadata entry, RandomAccessFile file) { try { - return SerializationUtils.deserialize(SpillableMapUtils - .readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(file, + entry.getOffsetOfValue(), entry.getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); } } @Override - public R put(T key, R value) { + public synchronized R put(T key, R value) { Review comment: If we make random access file synchronized, this is not required right ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274221364 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java ## @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.TimelineDiffHelper; +import com.uber.hoodie.common.util.TimelineDiffHelper.TimelineDiffResult; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Adds the capability to incrementally sync the changes to file-system view as and when new instants gets completed. + */ +public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView { + + private static Logger log = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class); + + @Override + protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { +try { + TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline); + if (diffResult.canSyncIncrementally()) { +log.info("Doing incremental sync"); +runIncrementalSync(newTimeline, diffResult); +log.info("Finished incremental sync"); +// Reset timeline to latest +visibleActiveTimeline = newTimeline; + } else { +log.warn("Incremental Sync of timeline is deemed unsafe. Will revert to full syncing"); +super.runSync(oldTimeline, newTimeline); + } +} catch (Exception ioe) { + log.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe); + super.runSync(oldTimeline, newTimeline); +} + } + + /** + * Run incremental sync based on the diff result produced. + * + * @param timeline New Timeline + * @param diffResult Timeline Diff Result + */ + private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) { + +log.info("Timeline Diff Result is :" + diffResult); + +// First remove pending compaction instants which were completed +diffResult.getFinishedCompactionInstants().stream().forEach(instant -> { + try { +removePendingCompactionInstant(timeline, instant); + } catch (IOException e) { +throw new HoodieException(e); + } +}); + +// Add new completed instants found in the latest timeline +diffResult.getNewlySeenInstants().stream() +.filter(instant -> instant.isCompleted() || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) +.forEach(instant -> { + try { +if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) +|| instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + addCommitInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { + addRestoreInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) { + addCleanInstant(timeline, instant); +} else if
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274219949 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java ## @@ -16,99 +16,88 @@ package com.uber.hoodie.common.table.view; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.CompactionOperation; -import com.uber.hoodie.common.model.FileSlice; -import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroupId; -import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.CompactionUtils; -import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.Option; import com.uber.hoodie.common.util.collection.Pair; -import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Predicate; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /** - * Common abstract implementation for multiple TableFileSystemView Implementations. 2 possible - * implementations are ReadOptimizedView and RealtimeView Concrete implementations extending - * this abstract class, should only implement getDataFilesInPartition which includes files to be - * included in the view - * + * TableFileSystemView Implementations based on in-memory storage. * @see TableFileSystemView * @since 0.3.0 */ -public class HoodieTableFileSystemView implements TableFileSystemView, -TableFileSystemView.ReadOptimizedView, -TableFileSystemView.RealtimeView, Serializable { +public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystemView { private static Logger log = LogManager.getLogger(HoodieTableFileSystemView.class); - protected HoodieTableMetaClient metaClient; - // This is the commits that will be visible for all views extending this view - protected HoodieTimeline visibleActiveTimeline; - // mapping from partition paths to file groups contained within them - protected HashMap> partitionToFileGroupsMap; - // mapping from file id to the file group. - protected HashMap fileGroupMap; + protected Map> partitionToFileGroupsMap; /** * PartitionPath + File-Id to pending compaction instant time */ - private final Map> fgIdToPendingCompaction; + private Map> fgIdToPendingCompaction; + + /** + * Flag to determine if closed + */ + private boolean closed = false; + + public HoodieTableFileSystemView() { Review comment: could you explain why we need this constructor (and that too public) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274219635 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/FileSystemViewManager.java ## @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.common.SerializableConfiguration; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.HoodieView; +import com.uber.hoodie.common.util.Functions.Function2; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * A container that can potentially hold one or more dataset's + * file-system views. There is one view for each dataset. This is a view built against a timeline containing completed + * actions. In an embedded timeline-server mode, this typically holds only one dataset's view. + * In a stand-alone server mode, this can hold more than one dataset's views. + */ +public class FileSystemViewManager { + private static Logger logger = LogManager.getLogger(FileSystemViewManager.class); + + private final SerializableConfiguration conf; + // The View Storage config used to store file-system views + private final FileSystemViewStorageConfig viewConfig; + // Map from Base-Path to View + private final ConcurrentHashMap globalViewMap; + // Factory Map to create file-system views + private final Function2 viewCreator; + + public FileSystemViewManager(SerializableConfiguration conf, FileSystemViewStorageConfig viewConfig, + Function2 viewCreator) { +this.conf = conf; +this.viewConfig = viewConfig; +this.globalViewMap = new ConcurrentHashMap<>(); +this.viewCreator = viewCreator; + } + + /** + * Drops reference to File-System Views. Future calls to view results in creating a new view + * @param basePath + */ + public void clearFileSystemView(String basePath) { +HoodieView view = globalViewMap.remove(basePath); +if (view != null) { + view.close(); +} + } + + /** + * Main API to get the file-system view for the base-path + * @param basePath + * @return + */ + public HoodieView getFileSystemView(String basePath) { +return globalViewMap.computeIfAbsent(basePath, +(path) -> viewCreator.apply(path, viewConfig)); + } + + /** + * Closes all views opened + */ + public void close() { +this.globalViewMap.values().stream().forEach(v -> v.close()); +this.globalViewMap.clear(); + } + + // FACTORY METHODS FOR CREATING FILE-SYSTEM VIEWS + + /** + * Create RocksDB based file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration + * @param basePath Base Path of dataset + * @return + */ + private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf, + FileSystemViewStorageConfig viewConf, String basePath) { +HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); +HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); +return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf); + } + + /** + * Create a spillable Map based file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration + * @param basePath Base Path of dataset + * @return + */ + private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, + FileSystemViewStorageConfig viewConf, String basePath) { +logger.warn("Creating SpillableMap based view for basePath " + basePath); +HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); +HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); +return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf); + } + + + /** + * Create an in-memory file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration +
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274224378 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java ## @@ -215,7 +249,15 @@ public void clear() { @Override public Collection values() { -throw new HoodieException("Unsupported Operation Exception"); Review comment: This is something that will result in OOMing of the process if allowed to run, I don't think we should support this. I like the valueStream() that you provided below.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274219318 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/FileSystemViewManager.java ## @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.common.SerializableConfiguration; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.HoodieView; +import com.uber.hoodie.common.util.Functions.Function2; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * A container that can potentially hold one or more dataset's + * file-system views. There is one view for each dataset. This is a view built against a timeline containing completed + * actions. In an embedded timeline-server mode, this typically holds only one dataset's view. + * In a stand-alone server mode, this can hold more than one dataset's views. + */ +public class FileSystemViewManager { + private static Logger logger = LogManager.getLogger(FileSystemViewManager.class); + + private final SerializableConfiguration conf; + // The View Storage config used to store file-system views + private final FileSystemViewStorageConfig viewConfig; + // Map from Base-Path to View + private final ConcurrentHashMap globalViewMap; + // Factory Map to create file-system views + private final Function2 viewCreator; + + public FileSystemViewManager(SerializableConfiguration conf, FileSystemViewStorageConfig viewConfig, + Function2 viewCreator) { +this.conf = conf; +this.viewConfig = viewConfig; +this.globalViewMap = new ConcurrentHashMap<>(); +this.viewCreator = viewCreator; + } + + /** + * Drops reference to File-System Views. Future calls to view results in creating a new view + * @param basePath + */ + public void clearFileSystemView(String basePath) { +HoodieView view = globalViewMap.remove(basePath); +if (view != null) { + view.close(); +} + } + + /** + * Main API to get the file-system view for the base-path + * @param basePath + * @return + */ + public HoodieView getFileSystemView(String basePath) { +return globalViewMap.computeIfAbsent(basePath, +(path) -> viewCreator.apply(path, viewConfig)); + } + + /** + * Closes all views opened + */ + public void close() { +this.globalViewMap.values().stream().forEach(v -> v.close()); +this.globalViewMap.clear(); + } + + // FACTORY METHODS FOR CREATING FILE-SYSTEM VIEWS + + /** + * Create RocksDB based file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration + * @param basePath Base Path of dataset + * @return + */ + private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf, + FileSystemViewStorageConfig viewConf, String basePath) { +HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); +HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); +return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf); + } + + /** + * Create a spillable Map based file System view for a dataset + * @param conf Hadoop Configuration + * @param viewConf View Storage Configuration + * @param basePath Base Path of dataset + * @return + */ + private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, + FileSystemViewStorageConfig viewConf, String basePath) { +logger.warn("Creating SpillableMap based view for basePath " + basePath); Review comment: info or debug ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274222871 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/SpillableMapBasedFileSystemView.java ## @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.util.DefaultSizeEstimator; +import com.uber.hoodie.common.util.collection.ExternalSpillableMap; +import com.uber.hoodie.common.util.collection.Pair; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Table FileSystemView implementation where view is stored in spillable disk using fixed memory + */ +public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { + + private static Logger log = LogManager.getLogger(SpillableMapBasedFileSystemView.class); + + private final long maxMemoryForFileGroupMap; + private final long maxMemoryForPendingCompaction; + private final String baseStoreDir; + + public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, + HoodieTimeline visibleActiveTimeline, FileSystemViewStorageConfig config) { +this.maxMemoryForFileGroupMap = config.getMaxMemoryForFileGroupMap(); +this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction(); Review comment: What is this used for ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274221885 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemViewDelegator.java ## @@ -0,0 +1,238 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.HoodieView; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.Functions.Function0; +import com.uber.hoodie.common.util.Functions.Function1; +import com.uber.hoodie.common.util.Functions.Function2; +import com.uber.hoodie.common.util.Functions.Function3; +import com.uber.hoodie.common.util.Option; +import com.uber.hoodie.common.util.collection.Pair; +import java.io.Serializable; +import java.util.List; +import java.util.stream.Stream; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * A file system view which proxies request to a preferred File System View implementation. In case of error, + * flip all subsequent calls to a backup file-system view implementation. + */ +public class PriorityBasedFileSystemViewDelegator implements HoodieView, Serializable { Review comment: The name doesn't seem to do justice to the fact that it implements HoodieView. May be just call is "PriorityBasedFileSystemView" ? I understand this is delegating, may be just mention it in the comments.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274221173 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java ## @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.TimelineDiffHelper; +import com.uber.hoodie.common.util.TimelineDiffHelper.TimelineDiffResult; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Adds the capability to incrementally sync the changes to file-system view as and when new instants gets completed. + */ +public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView { + + private static Logger log = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class); + + @Override + protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { +try { + TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline); + if (diffResult.canSyncIncrementally()) { +log.info("Doing incremental sync"); +runIncrementalSync(newTimeline, diffResult); +log.info("Finished incremental sync"); +// Reset timeline to latest +visibleActiveTimeline = newTimeline; + } else { +log.warn("Incremental Sync of timeline is deemed unsafe. Will revert to full syncing"); +super.runSync(oldTimeline, newTimeline); + } +} catch (Exception ioe) { + log.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe); + super.runSync(oldTimeline, newTimeline); +} + } + + /** + * Run incremental sync based on the diff result produced. + * + * @param timeline New Timeline + * @param diffResult Timeline Diff Result + */ + private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) { + +log.info("Timeline Diff Result is :" + diffResult); + +// First remove pending compaction instants which were completed +diffResult.getFinishedCompactionInstants().stream().forEach(instant -> { + try { +removePendingCompactionInstant(timeline, instant); + } catch (IOException e) { +throw new HoodieException(e); + } +}); + +// Add new completed instants found in the latest timeline +diffResult.getNewlySeenInstants().stream() +.filter(instant -> instant.isCompleted() || instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) +.forEach(instant -> { + try { +if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) +|| instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + addCommitInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { + addRestoreInstant(timeline, instant); +} else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) { + addCleanInstant(timeline, instant); +} else if
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274224728 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java ## @@ -70,23 +71,25 @@ public LazyFileIterator(RandomAccessFile file, Map o1, Map.Entry o2) -> o1.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue())) .collect(Collectors.toList()).iterator(); + this.addShutdownHook(); } @Override public boolean hasNext() { - return this.metadataIterator.hasNext(); + boolean available = this.metadataIterator.hasNext(); + if (!available) { Review comment: Why close() in a hasNext() API ? That's not a uniform behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274217845 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java ## @@ -0,0 +1,828 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.google.common.base.Preconditions; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.HoodieView; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.Option; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Common thread-safe implementation for multiple TableFileSystemView Implementations. + * Provides uniform handling of + * (a) Loading file-system views from underlying file-system + * (b) Pending compaction operations and changing file-system views based on that + * (c) Thread-safety in loading and managing file system views for this dataset. + * (d) resetting file-system views + * The actual mechanism of fetching file slices from different view storages is delegated to sub-classes. + */ +public abstract class AbstractTableFileSystemView implements HoodieView, Serializable { + + private static Logger log = LogManager.getLogger(AbstractTableFileSystemView.class); + + protected HoodieTableMetaClient metaClient; + + // This is the commits that will be visible for all views extending this view + protected HoodieTimeline visibleActiveTimeline; + + // Used to concurrently load and populate partition views + private ConcurrentHashMap addedPartitions = new ConcurrentHashMap<>(4096); + + // Locks to control concurrency. Sync operations use write-lock blocking all fetch operations. + // For the common-case, we allow concurrent read of single or multiple partitions + private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock(); + private final ReadLock readLock = globalLock.readLock(); + private final WriteLock writeLock = globalLock.writeLock(); + + private String getPartitionPathFromFilePath(String fullPath) { +return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), new Path(fullPath).getParent()); + } + + /** + * Inisitalize the view. + */ + protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { +this.metaClient = metaClient; +this.visibleActiveTimeline = visibleActiveTimeline; +// Load Pending Compaction Operations +resetPendingCompactionOperations( +CompactionUtils.getAllPendingCompactionOperations(metaClient).values() +.stream().map(e -> Pair.of(e.getKey(), +CompactionOperation.convertFromAvroRecordInstance(e.getValue(); + } + + /** + * Adds the provided statuses into the file system view, and also caches it inside this object. + */ + protected List addFilesToView(FileStatus[] statuses) { +long beginFgTs = System.currentTimeMillis(); +
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274223908 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java ## @@ -74,6 +73,22 @@ protected DiskBasedMap(String baseFilePath) throws IOException { this.filePosition = new AtomicLong(0L); } + /** + * RandomAcessFile is not thread-safe. This API opens a new file handle and returns. + * @return + */ + private RandomAccessFile getNewRandomAccessFile() { Review comment: Instead of this, can we just create a synchronized version of the RandomAccessFile ? Wrap the class and implement 1 writer-N readers model, just like something discussed above for AbstractFileSystem.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support
n3nash commented on a change in pull request #600: Timeline Service with Incremental View Syncing support URL: https://github.com/apache/incubator-hudi/pull/600#discussion_r274224638 ## File path: hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java ## @@ -34,19 +31,19 @@ public class LazyFileIterable implements Iterable { // Used to access the value written at a specific position in the file - private final RandomAccessFile readOnlyFileHandle; + private final String filePath; // Stores the key and corresponding value's latest metadata spilled to disk private final Map inMemoryMetadataOfSpilledData; - public LazyFileIterable(RandomAccessFile file, Map map) { -this.readOnlyFileHandle = file; + public LazyFileIterable(String filePath, Map map) { +this.filePath = filePath; Review comment: What's the benefit of doing this ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on issue #533: Hive sync error with
n3nash commented on issue #533: Hive sync error with URL: https://github.com/apache/incubator-hudi/issues/533#issuecomment-481901705 closing this ticket in favor of #633 fixing the underlying issue This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-hudi] branch master updated: Performing commit archiving in batches to avoid keeping a huge chunk in memory
This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git The following commit(s) were added to refs/heads/master by this push: new a8feee9 Performing commit archiving in batches to avoid keeping a huge chunk in memory a8feee9 is described below commit a8feee929394194922405bd12b330e40e9b710fe Author: Nishith Agarwal AuthorDate: Sun Apr 7 11:12:22 2019 -0700 Performing commit archiving in batches to avoid keeping a huge chunk in memory --- .../com/uber/hoodie/config/HoodieCompactionConfig.java | 9 + .../java/com/uber/hoodie/config/HoodieWriteConfig.java | 5 + .../com/uber/hoodie/io/HoodieCommitArchiveLog.java | 18 ++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 95e0c9b..dfd69c5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -45,6 +45,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained"; public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; + public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch"; // Upsert uses this file size to compact new data onto existing files.. public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; // By default, treat any file <= 100MB as a small file. @@ -104,6 +105,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10"; private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; + private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10); public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target" + ".partitions"; // 500GB of target IO per compaction (both read and write) @@ -240,6 +242,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } +public Builder withCommitsArchivalBatchSize(int batchSize) { + props.setProperty(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, String.valueOf(batchSize)); + return this; +} + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, @@ -281,6 +288,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED); setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP), TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); + setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP), + COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 7156623..115dd51 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -249,6 +249,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { .parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP)); } + public int getCommitArchivalBatchSize() { +return Integer + .parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP)); + } + /** * index properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index eb836d0..ccf303a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -245,11 +245,11 @@ public class HoodieCommitArchiveLog { List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) {
[GitHub] [incubator-hudi] n3nash commented on issue #635: Add empty payload class to support deletes via apache spark
n3nash commented on issue #635: Add empty payload class to support deletes via apache spark URL: https://github.com/apache/incubator-hudi/pull/635#issuecomment-481815391 @lyogev Thanks for this contribution! Left you a couple of minor comments, else things look good. Also, could you squash your commits into 1 commit and see why the build is failing, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #635: Add empty payload class to support deletes via apache spark
n3nash commented on a change in pull request #635: Add empty payload class to support deletes via apache spark URL: https://github.com/apache/incubator-hudi/pull/635#discussion_r274104385 ## File path: hoodie-spark/src/main/java/com/uber/hoodie/EmptyHoodieRecordPayload.java ## @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie; + +import com.uber.hoodie.common.model.HoodieRecordPayload; +import java.util.Optional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +/** + * Empty payload used for deletions in apache spark. Review comment: Just keep it to : Empty payload used for deletions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash commented on a change in pull request #635: Add empty payload class to support deletes via apache spark
n3nash commented on a change in pull request #635: Add empty payload class to support deletes via apache spark URL: https://github.com/apache/incubator-hudi/pull/635#discussion_r274104195 ## File path: hoodie-spark/src/main/java/com/uber/hoodie/EmptyHoodieRecordPayload.java ## @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) Review comment: nite : change this to 2019 please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] lyogev opened a new pull request #635: Add empty payload class to support deletes via apache spark
lyogev opened a new pull request #635: Add empty payload class to support deletes via apache spark URL: https://github.com/apache/incubator-hudi/pull/635 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] lyogev commented on issue #498: Is there any record delete examples?
lyogev commented on issue #498: Is there any record delete examples? URL: https://github.com/apache/incubator-hudi/issues/498#issuecomment-481792830 Opened #635 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] lyogev commented on issue #625: fix(spark): fix shading for spark bundle to include all hive's libraries unshaded
lyogev commented on issue #625: fix(spark): fix shading for spark bundle to include all hive's libraries unshaded URL: https://github.com/apache/incubator-hudi/pull/625#issuecomment-481774903 This works now perfectly with 0.4.6-snapshot! Due to fixes in #633 and #620. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] lyogev closed pull request #625: fix(spark): fix shading for spark bundle to include all hive's libraries unshaded
lyogev closed pull request #625: fix(spark): fix shading for spark bundle to include all hive's libraries unshaded URL: https://github.com/apache/incubator-hudi/pull/625 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] n3nash edited a comment on issue #623: Hudi Test Suite
n3nash edited a comment on issue #623: Hudi Test Suite URL: https://github.com/apache/incubator-hudi/pull/623#issuecomment-481403824 Created a new module `hoodie-bench` Current Limitations & workarounds - Cannot specify which specific partitions one wants to generate updates for, the suite will generate updates across any N partitions. Although, if you wanted to test updates for specific partition only, you can simply generate a dataset with 1 partition and then generate updates for that. - No concrete implementation around compaction, scheduling compaction, rollback but basic building blocks have been implemented. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services