[GitHub] [hudi] vinothchandar commented on a change in pull request #3194: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap
vinothchandar commented on a change in pull request #3194: URL: https://github.com/apache/hudi/pull/3194#discussion_r664155915 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +/** + * This class provides a disk spillable only map implementation. + * All of the data is stored using the RocksDB implementation. + */ +public final class RocksDbDiskMap implements DiskMap { + // ColumnFamily allows partitioning data within RockDB, which allows + // independent configuration and faster deletes across partitions + // https://github.com/facebook/rocksdb/wiki/Column-Families + // For this use case, we use a single static column family/ partition + // + private static final String COLUMN_FAMILY_NAME = "spill_map"; + + private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); + // Stores the key and corresponding value's latest metadata spilled to disk + private final Set keySet; + private final String rocksDbStoragePath; + private RocksDBDAO rocksDb; + + public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { +this.keySet = new HashSet<>(); +this.rocksDbStoragePath = rocksDbStoragePath; + } + + @Override + public int size() { +return keySet.size(); + } + + @Override + public boolean isEmpty() { +return keySet.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { +return keySet.contains((T) key); + } + + @Override + public boolean containsValue(Object value) { +throw new HoodieNotSupportedException("unable to compare values in map"); + } + + @Override + public R get(Object key) { +if (!containsKey(key)) { + return null; +} +return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key); + } + + @Override + public R put(T key, R value) { +getRocksDb().put(COLUMN_FAMILY_NAME, key, value); +keySet.add(key); +return value; + } + + @Override + public R remove(Object key) { +R value = get(key); +if (value != null) { + keySet.remove((T) key); + getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key); +} +return value; + } + + @Override + public void putAll(Map keyValues) { +getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value))); +keySet.addAll(keyValues.keySet()); + } + + @Override + public void clear() { +close(); + } + + @Override + public Set keySet() { +return keySet; + } + + @Override + public Collection values() { +throw new HoodieException("Unsupported Operation Exception"); + } + + @Override + public Set> entrySet() { +Set> entrySet = new HashSet<>(); +for (T key : keySet) { + entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key))); +} +return entrySet; + } + + /** + * Custom iterator to iterate over values written to disk. + */ + @Override + public Iterator iterator() { +return getRocksDb().iterator(COLUMN_FAMILY_NAME); + } + + @Override + public Stream valueStream() { +return keySet.stream().map(key -> (R) get(key)); + } + + @Override + public long sizeOfFileOnDiskInBytes() { +return getRocksDb().getTotalBytesWritten(); + } + + @Override + public void close() { +keySet.clear(); +if (null != rocksDb) { + rocksDb.close(); +} +rocksDb = null; + } + + private RocksDBDAO getRocksDb() { +if (null == rocksDb) { + synchronized (this) { Review comment: Ok makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
[GitHub] [hudi] vinothchandar commented on a change in pull request #3194: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap
vinothchandar commented on a change in pull request #3194: URL: https://github.com/apache/hudi/pull/3194#discussion_r664155915 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +/** + * This class provides a disk spillable only map implementation. + * All of the data is stored using the RocksDB implementation. + */ +public final class RocksDbDiskMap implements DiskMap { + // ColumnFamily allows partitioning data within RockDB, which allows + // independent configuration and faster deletes across partitions + // https://github.com/facebook/rocksdb/wiki/Column-Families + // For this use case, we use a single static column family/ partition + // + private static final String COLUMN_FAMILY_NAME = "spill_map"; + + private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); + // Stores the key and corresponding value's latest metadata spilled to disk + private final Set keySet; + private final String rocksDbStoragePath; + private RocksDBDAO rocksDb; + + public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { +this.keySet = new HashSet<>(); +this.rocksDbStoragePath = rocksDbStoragePath; + } + + @Override + public int size() { +return keySet.size(); + } + + @Override + public boolean isEmpty() { +return keySet.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { +return keySet.contains((T) key); + } + + @Override + public boolean containsValue(Object value) { +throw new HoodieNotSupportedException("unable to compare values in map"); + } + + @Override + public R get(Object key) { +if (!containsKey(key)) { + return null; +} +return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key); + } + + @Override + public R put(T key, R value) { +getRocksDb().put(COLUMN_FAMILY_NAME, key, value); +keySet.add(key); +return value; + } + + @Override + public R remove(Object key) { +R value = get(key); +if (value != null) { + keySet.remove((T) key); + getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key); +} +return value; + } + + @Override + public void putAll(Map keyValues) { +getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value))); +keySet.addAll(keyValues.keySet()); + } + + @Override + public void clear() { +close(); + } + + @Override + public Set keySet() { +return keySet; + } + + @Override + public Collection values() { +throw new HoodieException("Unsupported Operation Exception"); + } + + @Override + public Set> entrySet() { +Set> entrySet = new HashSet<>(); +for (T key : keySet) { + entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key))); +} +return entrySet; + } + + /** + * Custom iterator to iterate over values written to disk. + */ + @Override + public Iterator iterator() { +return getRocksDb().iterator(COLUMN_FAMILY_NAME); + } + + @Override + public Stream valueStream() { +return keySet.stream().map(key -> (R) get(key)); + } + + @Override + public long sizeOfFileOnDiskInBytes() { +return getRocksDb().getTotalBytesWritten(); + } + + @Override + public void close() { +keySet.clear(); +if (null != rocksDb) { + rocksDb.close(); +} +rocksDb = null; + } + + private RocksDBDAO getRocksDb() { +if (null == rocksDb) { + synchronized (this) { Review comment: Ok makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
[GitHub] [hudi] vinothchandar commented on a change in pull request #3194: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap
vinothchandar commented on a change in pull request #3194: URL: https://github.com/apache/hudi/pull/3194#discussion_r663300797 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +/** + * This class provides a disk spillable only map implementation. + * All of the data is stored using the RocksDB implementation. + */ +public final class RocksDbDiskMap implements DiskMap { + // ColumnFamily allows partitioning data within RockDB, which allows + // independent configuration and faster deletes across partitions + // https://github.com/facebook/rocksdb/wiki/Column-Families + // For this use case, we use a single static column family/ partition + // + private static final String COLUMN_FAMILY_NAME = "spill_map"; + + private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); + // Stores the key and corresponding value's latest metadata spilled to disk + private final Set keySet; + private final String rocksDbStoragePath; + private RocksDBDAO rocksDb; + + public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { +this.keySet = new HashSet<>(); +this.rocksDbStoragePath = rocksDbStoragePath; + } + + @Override + public int size() { +return keySet.size(); + } + + @Override + public boolean isEmpty() { +return keySet.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { +return keySet.contains((T) key); + } + + @Override + public boolean containsValue(Object value) { +throw new HoodieNotSupportedException("unable to compare values in map"); + } + + @Override + public R get(Object key) { +if (!containsKey(key)) { + return null; +} +return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key); + } + + @Override + public R put(T key, R value) { +getRocksDb().put(COLUMN_FAMILY_NAME, key, value); +keySet.add(key); +return value; + } + + @Override + public R remove(Object key) { +R value = get(key); +if (value != null) { + keySet.remove((T) key); + getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key); +} +return value; + } + + @Override + public void putAll(Map keyValues) { +getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value))); +keySet.addAll(keyValues.keySet()); + } + + @Override + public void clear() { +close(); + } + + @Override + public Set keySet() { +return keySet; + } + + @Override + public Collection values() { +throw new HoodieException("Unsupported Operation Exception"); + } + + @Override + public Set> entrySet() { +Set> entrySet = new HashSet<>(); +for (T key : keySet) { + entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key))); +} +return entrySet; + } + + /** + * Custom iterator to iterate over values written to disk. + */ + @Override + public Iterator iterator() { +return getRocksDb().iterator(COLUMN_FAMILY_NAME); + } + + @Override + public Stream valueStream() { +return keySet.stream().map(key -> (R) get(key)); + } + + @Override + public long sizeOfFileOnDiskInBytes() { +return getRocksDb().getTotalBytesWritten(); + } + + @Override + public void close() { +keySet.clear(); +if (null != rocksDb) { + rocksDb.close(); +} +rocksDb = null; + } + + private RocksDBDAO getRocksDb() { +if (null == rocksDb) { + synchronized (this) { Review comment: if there is no actual contention, this is very cheap actually. but +1 on the general point -- This is an automated message from the