muyangye commented on code in PR #2430:
URL: https://github.com/apache/streampipes/pull/2430#discussion_r1512706501


##########
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v095/MergeFilenamesAndRenameDuplicatesMigration.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.migrations.v095;
+
+import org.apache.streampipes.manager.file.FileHandler;
+import org.apache.streampipes.model.file.FileMetadata;
+import org.apache.streampipes.service.core.migrations.Migration;
+import org.apache.streampipes.storage.api.IFileMetadataStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.lightcouch.CouchDbClient;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MergeFilenamesAndRenameDuplicatesMigration implements Migration {
+
+  protected static final String ORIGINAL_FILENAME = "originalFilename";
+  protected static final String INTERNAL_FILENAME = "internalFilename";
+  protected static final String ID = "_id";
+  protected static final String FILETYPE = "filetype";
+
+  private CouchDbClient couchDbClient;
+
+  private ObjectMapper mapper = new ObjectMapper();
+
+  private IFileMetadataStorage fileMetadataStorage =
+      StorageDispatcher.INSTANCE.getNoSqlStore().getFileMetadataStorage();
+
+  private FileHandler fileHandler = new FileHandler();
+
+  protected Map<String, List<FileMetadata>> fileMetadataGroupedByOriginalName 
= new HashMap<>();
+
+  private boolean isTesting = false;
+
+  public MergeFilenamesAndRenameDuplicatesMigration(boolean testing) {
+    isTesting = testing;
+  }
+
+  public MergeFilenamesAndRenameDuplicatesMigration() {
+    couchDbClient = Utils.getCouchDbFileMetadataClient();
+  }
+
+  // Starting from v0.95, StreamPipes will use a single file name as the 
unique identifier of files instead of an
+  // internal filename and an original filename. This migration merges them 
and renames all the files that have
+  // duplicate names to ensure uniqueness
+  @Override
+  public boolean shouldExecute() {
+    return true;
+  }
+
+  @Override
+  public void executeMigration() {
+    var couchDbRawFileMetadata = 
getCouchDbRawFileMetadata(getAllFileIds(fileMetadataStorage));
+    getFileMetadataToUpdate(couchDbRawFileMetadata);
+    fileMetadataGroupedByOriginalName.forEach(
+        (originalFilename, fileMetadataList) -> update(originalFilename, 
fileMetadataList));
+  }
+
+  /**
+   * Gets all fileMetadata that need to be updated grouped by originalFilename
+   * key is (possibly) duplicated originalFilename and value is that file's 
FileMetadata list (if duplicated)
+   */
+  protected void getFileMetadataToUpdate(List<Map<String, Object>> 
couchDbRawFileMetadata) {
+    couchDbRawFileMetadata.forEach(
+        rawFileMetadata -> checkDuplicateOriginalFilename(rawFileMetadata));
+  }
+
+  /**
+   * Fetches all fileIds stored in CouchDB
+   */
+  private List<String> getAllFileIds(IFileMetadataStorage fileMetadataStorage) 
{
+    return 
fileMetadataStorage.getAllFileMetadataDescriptions().stream().map(fileMetadata 
-> fileMetadata.getFileId())
+        .toList();
+  }
+
+  /**
+   * Takes the list of fileIds and searches for their raw metadata in CouchDB 
and returns them
+   */
+  private List<Map<String, Object>> getCouchDbRawFileMetadata(List<String> 
fileIds) {
+    return fileIds.stream()
+        .map(fileId -> convertInputStreamToMap(couchDbClient.find(fileId)))
+        .toList();
+  }
+
+  /**
+   * Converts InputStream (as stored in CouchDB) to Map, if there's an error, 
constructs a new Map
+   */
+  private Map<String, Object> convertInputStreamToMap(InputStream inputStream) 
{
+    try {
+      return mapper.readValue(inputStream, Map.class);
+    } catch (Exception e) {
+      return new HashMap<>();

Review Comment:
   Just added a log message. By the way even if this exception occurs, rest of 
the files' migration will not be affected since it will just be skipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to