Hisoka-X commented on code in PR #3637:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3637#discussion_r1051772025


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java:
##########
@@ -148,6 +149,8 @@ public IdentifiedDataSerializable create(int typeId) {
                     return new CleanTaskGroupContextOperation();
                 case SOURCE_READER_EVENT_OPERATOR:
                     return new SourceReaderEventOperation();
+                case CHECK_TASKGROUP_IS_EXECUTING:

Review Comment:
   Any Code use this? I can't find it.



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -612,58 +600,106 @@ public void testStreamJobRestoreIn3NodeMasterDown() 
throws ExecutionException, I
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled("Wait for open Imap storage")
     public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, 
InterruptedException {
         String testCaseName = "testStreamJobRestoreInAllNodeDown";
         String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown";
-        long testRowNumber = 1000;
+        int testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
         HazelcastInstanceImpl node2 = null;
         HazelcastInstanceImpl node3 = null;
         SeaTunnelClient engineClient = null;
 
         try {
-            node1 = SeaTunnelServerStarter.createHazelcastInstance(
-                    TestUtils.getClusterName(testClusterName));
+            String yaml = "#\n" +

Review Comment:
   Hi, Why not put it in resource directory?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java:
##########
@@ -17,58 +17,84 @@
 
 package org.apache.seatunnel.engine.server.persistence;
 
+import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
+
+import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
+import org.apache.seatunnel.engine.imap.storage.file.IMapFileStorage;
+
+import com.google.common.collect.Maps;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.map.MapLoaderLifecycleSupport;
 import com.hazelcast.map.MapStore;
+import lombok.SneakyThrows;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 public class FileMapStore implements MapStore<Object, Object>, 
MapLoaderLifecycleSupport {
-    //TODO Wait for the file Kv storage development to complete
+
+    private IMapStorage mapStorage;
 
     @Override
     public void init(HazelcastInstance hazelcastInstance, Properties 
properties, String mapName) {
+        // TODO implemented by loading the factory
+        mapStorage = new IMapFileStorage();
+        Map<String, Object> initMap = new 
HashMap<>(Maps.fromProperties(properties));
+        Configuration configuration = new Configuration();
+        configuration.set("fs.defaultFS", 
properties.getProperty("fs.defaultFS"));
+        initMap.put(HDFS_CONFIG_KEY, configuration);
+        mapStorage.initialize(initMap);
+
     }
 
     @Override
     public void destroy() {
-
+        mapStorage.destroy(false);
     }
 
     @Override
     public void store(Object key, Object value) {
+        mapStorage.store(key, value);
     }
 
     @Override
     public void storeAll(Map<Object, Object> map) {
-
+        mapStorage.storeAll(map);
     }
 
     @Override
     public void delete(Object key) {
-
+        mapStorage.delete(key);
     }
 
     @Override
     public void deleteAll(Collection<Object> keys) {
-
+        mapStorage.deleteAll(keys);
     }
 
+    @SneakyThrows
     @Override
-    public String load(Object key) {
+    public Object load(Object key) {
         return null;
     }
 
+    @SneakyThrows
     @Override
     public Map<Object, Object> loadAll(Collection<Object> keys) {
-        return null;
+        Map<Object, Object> allMap = mapStorage.loadAll();
+        Map<Object, Object> retMap = new HashMap<>(keys.size());

Review Comment:
   ```suggestion
           Map<Object, Object> retMap = new HashMap<>();
   ```
   The HashMap parameter are not init size, it init capcity.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java:
##########
@@ -17,58 +17,84 @@
 
 package org.apache.seatunnel.engine.server.persistence;
 
+import static 
org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
+
+import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
+import org.apache.seatunnel.engine.imap.storage.file.IMapFileStorage;
+
+import com.google.common.collect.Maps;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.map.MapLoaderLifecycleSupport;
 import com.hazelcast.map.MapStore;
+import lombok.SneakyThrows;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 public class FileMapStore implements MapStore<Object, Object>, 
MapLoaderLifecycleSupport {
-    //TODO Wait for the file Kv storage development to complete
+
+    private IMapStorage mapStorage;
 
     @Override
     public void init(HazelcastInstance hazelcastInstance, Properties 
properties, String mapName) {
+        // TODO implemented by loading the factory
+        mapStorage = new IMapFileStorage();
+        Map<String, Object> initMap = new 
HashMap<>(Maps.fromProperties(properties));
+        Configuration configuration = new Configuration();
+        configuration.set("fs.defaultFS", 
properties.getProperty("fs.defaultFS"));
+        initMap.put(HDFS_CONFIG_KEY, configuration);
+        mapStorage.initialize(initMap);
+
     }
 
     @Override
     public void destroy() {
-
+        mapStorage.destroy(false);
     }
 
     @Override
     public void store(Object key, Object value) {
+        mapStorage.store(key, value);
     }
 
     @Override
     public void storeAll(Map<Object, Object> map) {
-
+        mapStorage.storeAll(map);
     }
 
     @Override
     public void delete(Object key) {
-
+        mapStorage.delete(key);
     }
 
     @Override
     public void deleteAll(Collection<Object> keys) {
-
+        mapStorage.deleteAll(keys);
     }
 
+    @SneakyThrows
     @Override
-    public String load(Object key) {
+    public Object load(Object key) {
         return null;
     }
 
+    @SneakyThrows
     @Override
     public Map<Object, Object> loadAll(Collection<Object> keys) {
-        return null;
+        Map<Object, Object> allMap = mapStorage.loadAll();
+        Map<Object, Object> retMap = new HashMap<>(keys.size());

Review Comment:
   Or just use `retMap.putAll(allMap);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to