[GitHub] [hudi] vinothchandar commented on a change in pull request #3194: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap

2021-07-06 Thread GitBox


vinothchandar commented on a change in pull request #3194:
URL: https://github.com/apache/hudi/pull/3194#discussion_r664155915



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation.
+ * All of the data is stored using the RocksDB implementation.
+ */
+public final class RocksDbDiskMap implements DiskMap {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
+this.keySet = new HashSet<>();
+this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+if (!containsKey(key)) {
+  return null;
+}
+return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+keySet.add(key);
+return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+R value = get(key);
+if (value != null) {
+  keySet.remove((T) key);
+  getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+}
+return value;
+  }
+
+  @Override
+  public void putAll(Map keyValues) {
+getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> 
getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+close();
+  }
+
+  @Override
+  public Set keySet() {
+return keySet;
+  }
+
+  @Override
+  public Collection values() {
+throw new HoodieException("Unsupported Operation Exception");
+  }
+
+  @Override
+  public Set> entrySet() {
+Set> entrySet = new HashSet<>();
+for (T key : keySet) {
+  entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
+}
+return entrySet;
+  }
+
+  /**
+   * Custom iterator to iterate over values written to disk.
+   */
+  @Override
+  public Iterator iterator() {
+return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+  }
+
+  @Override
+  public Stream valueStream() {
+return keySet.stream().map(key -> (R) get(key));
+  }
+
+  @Override
+  public long sizeOfFileOnDiskInBytes() {
+return getRocksDb().getTotalBytesWritten();
+  }
+
+  @Override
+  public void close() {
+keySet.clear();
+if (null != rocksDb) {
+  rocksDb.close();
+}
+rocksDb = null;
+  }
+
+  private RocksDBDAO getRocksDb() {
+if (null == rocksDb) {
+  synchronized (this) {

Review comment:
   Ok makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[GitHub] [hudi] vinothchandar commented on a change in pull request #3194: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap

2021-07-05 Thread GitBox


vinothchandar commented on a change in pull request #3194:
URL: https://github.com/apache/hudi/pull/3194#discussion_r664155915



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation.
+ * All of the data is stored using the RocksDB implementation.
+ */
+public final class RocksDbDiskMap implements DiskMap {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
+this.keySet = new HashSet<>();
+this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+if (!containsKey(key)) {
+  return null;
+}
+return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+keySet.add(key);
+return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+R value = get(key);
+if (value != null) {
+  keySet.remove((T) key);
+  getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+}
+return value;
+  }
+
+  @Override
+  public void putAll(Map keyValues) {
+getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> 
getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+close();
+  }
+
+  @Override
+  public Set keySet() {
+return keySet;
+  }
+
+  @Override
+  public Collection values() {
+throw new HoodieException("Unsupported Operation Exception");
+  }
+
+  @Override
+  public Set> entrySet() {
+Set> entrySet = new HashSet<>();
+for (T key : keySet) {
+  entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
+}
+return entrySet;
+  }
+
+  /**
+   * Custom iterator to iterate over values written to disk.
+   */
+  @Override
+  public Iterator iterator() {
+return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+  }
+
+  @Override
+  public Stream valueStream() {
+return keySet.stream().map(key -> (R) get(key));
+  }
+
+  @Override
+  public long sizeOfFileOnDiskInBytes() {
+return getRocksDb().getTotalBytesWritten();
+  }
+
+  @Override
+  public void close() {
+keySet.clear();
+if (null != rocksDb) {
+  rocksDb.close();
+}
+rocksDb = null;
+  }
+
+  private RocksDBDAO getRocksDb() {
+if (null == rocksDb) {
+  synchronized (this) {

Review comment:
   Ok makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[GitHub] [hudi] vinothchandar commented on a change in pull request #3194: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap

2021-07-02 Thread GitBox


vinothchandar commented on a change in pull request #3194:
URL: https://github.com/apache/hudi/pull/3194#discussion_r663300797



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation.
+ * All of the data is stored using the RocksDB implementation.
+ */
+public final class RocksDbDiskMap implements DiskMap {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
+this.keySet = new HashSet<>();
+this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+if (!containsKey(key)) {
+  return null;
+}
+return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+keySet.add(key);
+return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+R value = get(key);
+if (value != null) {
+  keySet.remove((T) key);
+  getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+}
+return value;
+  }
+
+  @Override
+  public void putAll(Map keyValues) {
+getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> 
getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+close();
+  }
+
+  @Override
+  public Set keySet() {
+return keySet;
+  }
+
+  @Override
+  public Collection values() {
+throw new HoodieException("Unsupported Operation Exception");
+  }
+
+  @Override
+  public Set> entrySet() {
+Set> entrySet = new HashSet<>();
+for (T key : keySet) {
+  entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
+}
+return entrySet;
+  }
+
+  /**
+   * Custom iterator to iterate over values written to disk.
+   */
+  @Override
+  public Iterator iterator() {
+return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+  }
+
+  @Override
+  public Stream valueStream() {
+return keySet.stream().map(key -> (R) get(key));
+  }
+
+  @Override
+  public long sizeOfFileOnDiskInBytes() {
+return getRocksDb().getTotalBytesWritten();
+  }
+
+  @Override
+  public void close() {
+keySet.clear();
+if (null != rocksDb) {
+  rocksDb.close();
+}
+rocksDb = null;
+  }
+
+  private RocksDBDAO getRocksDb() {
+if (null == rocksDb) {
+  synchronized (this) {

Review comment:
   if there is no actual contention, this is very cheap actually. but +1 on 
the general point




-- 
This is an automated message from the