liugddx commented on code in PR #3637:
URL:
https://github.com/apache/incubator-seatunnel/pull/3637#discussion_r1051779552
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java:
##########
@@ -17,58 +17,84 @@
package org.apache.seatunnel.engine.server.persistence;
+import static
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
+
+import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
+import org.apache.seatunnel.engine.imap.storage.file.IMapFileStorage;
+
+import com.google.common.collect.Maps;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.MapLoaderLifecycleSupport;
import com.hazelcast.map.MapStore;
+import lombok.SneakyThrows;
+import org.apache.hadoop.conf.Configuration;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FileMapStore implements MapStore<Object, Object>,
MapLoaderLifecycleSupport {
- //TODO Wait for the file Kv storage development to complete
+
+ private IMapStorage mapStorage;
@Override
public void init(HazelcastInstance hazelcastInstance, Properties
properties, String mapName) {
+ // TODO implemented by loading the factory
+ mapStorage = new IMapFileStorage();
+ Map<String, Object> initMap = new
HashMap<>(Maps.fromProperties(properties));
+ Configuration configuration = new Configuration();
+ configuration.set("fs.defaultFS",
properties.getProperty("fs.defaultFS"));
+ initMap.put(HDFS_CONFIG_KEY, configuration);
+ mapStorage.initialize(initMap);
+
}
@Override
public void destroy() {
-
+ mapStorage.destroy(false);
}
@Override
public void store(Object key, Object value) {
+ mapStorage.store(key, value);
}
@Override
public void storeAll(Map<Object, Object> map) {
-
+ mapStorage.storeAll(map);
}
@Override
public void delete(Object key) {
-
+ mapStorage.delete(key);
}
@Override
public void deleteAll(Collection<Object> keys) {
-
+ mapStorage.deleteAll(keys);
}
+ @SneakyThrows
@Override
- public String load(Object key) {
+ public Object load(Object key) {
return null;
}
+ @SneakyThrows
@Override
public Map<Object, Object> loadAll(Collection<Object> keys) {
- return null;
+ Map<Object, Object> allMap = mapStorage.loadAll();
+ Map<Object, Object> retMap = new HashMap<>(keys.size());
Review Comment:
When I test, there will be some keys that should not appear in the returned
list. Therefore, you cannot directly use `retMap.putAll(allMap);`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]