>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18711 )


Change subject: Add capability to calculate collection size
......................................................................

Add capability to calculate collection size

Change-Id: I02fb8ff0a87ff272aba05a676ccfb46ac370ebd2
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
7 files changed, 345 insertions(+), 8 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/11/18711/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java
new file mode 100644
index 0000000..1baff0f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeRequestMessage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CalculateCollectionSizeRequestMessage implements 
ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+    private final String fqn;
+
+    public CalculateCollectionSizeRequestMessage(String nodeId, long reqId, 
String fqn) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+        this.fqn = fqn;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) {
+        try {
+            long size = 0;
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            if (csm.getState() == IClusterManagementWork.ClusterState.ACTIVE
+                    || csm.getState() == 
IClusterManagementWork.ClusterState.REBALANCE_REQUIRED) {
+                final List<String> ncs = new 
ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+                CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+
+                long reqId = messageBroker.newRequestId();
+                List<CollectionSizeRequestMessage> requests = new 
ArrayList<>();
+                for (int i = 0; i < ncs.size(); i++) {
+                    requests.add(new CollectionSizeRequestMessage(reqId, fqn));
+                }
+                size = (long) messageBroker.sendSyncRequestToNCs(reqId, ncs, 
requests, TimeUnit.SECONDS.toMillis(60),
+                        true);
+
+                CalculateCollectionSizeResponseMessage response =
+                        new CalculateCollectionSizeResponseMessage(this.reqId, 
size);
+                messageBroker.sendApplicationMessageToNC(response, nodeId);
+            }
+        } catch (Exception ex) {
+            // TODO(htowaileb): proper error message
+            throw new IllegalStateException(ex);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java
new file mode 100644
index 0000000..eaf5975
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CalculateCollectionSizeResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CalculateCollectionSizeResponseMessage implements 
INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final long size;
+
+    public CalculateCollectionSizeResponseMessage(long reqId, long size) {
+        this.reqId = reqId;
+        this.size = size;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public long getSize() {
+        return size;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java
new file mode 100644
index 0000000..6b9cb4a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeRequestMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CollectionSizeRequestMessage extends CcIdentifiedMessage 
implements INcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String fqn;
+
+    public CollectionSizeRequestMessage(long reqId, String fqn) {
+        this.reqId = reqId;
+        this.fqn = fqn;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        try {
+            IIOManager ioManager = appCtx.getPersistenceIoManager();
+            CollectionSizeResponseMessage response =
+                    new CollectionSizeResponseMessage(reqId, 
ioManager.getCollectionSize(fqn), null);
+            respond(appCtx, response);
+        } catch (Exception e) {
+            // TODO(htowaileb): proper error message
+            LOGGER.error("failed to get collection size", e);
+            CollectionSizeResponseMessage response = new 
CollectionSizeResponseMessage(reqId, 0, e);
+            respond(appCtx, response);
+        }
+    }
+
+    private void respond(INcApplicationContext appCtx, 
CollectionSizeResponseMessage response) {
+        NCMessageBroker messageBroker = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendMessageToPrimaryCC(response);
+        } catch (Exception e) {
+            // TODO(htowaileb): proper error message
+            LOGGER.error("failed to send collection size to cc", e);
+        }
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java
new file mode 100644
index 0000000..023a7a6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CollectionSizeResponseMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INcResponse;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+public class CollectionSizeResponseMessage implements ICcAddressedMessage, 
INcResponse {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final long collectionSize;
+    private final Throwable failure;
+
+    public CollectionSizeResponseMessage(long reqId, long collectionSize, 
Throwable failure) {
+        this.reqId = reqId;
+        this.collectionSize = collectionSize;
+        this.failure = failure;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) {
+        ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        broker.respond(reqId, this);
+    }
+
+    @Override
+    public void setResult(MutablePair<ResponseState, Object> result) {
+        if (failure != null) {
+            result.setLeft(ResponseState.FAILURE);
+            result.setRight(failure);
+            return;
+        }
+        setResponse(result);
+    }
+
+    private void setResponse(MutablePair<ResponseState, Object> result) {
+        switch (result.getKey()) {
+            case SUCCESS:
+                long currentSize = (long) result.getValue();
+                result.setValue(currentSize + collectionSize);
+                break;
+            case UNINITIALIZED:
+                result.setLeft(ResponseState.SUCCESS);
+                result.setValue(collectionSize);
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index c005253..a006dec 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -30,6 +30,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;

 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
@@ -68,6 +69,8 @@
 public abstract class AbstractCloudIOManager extends IOManager implements 
IPartitionBootstrapper, ICloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
     private static final byte[] EMPTY_FILE_BYTES = "empty".getBytes();
+    private static final Predicate<String> NO_OP_LIST_FILES_FILTER = (path) -> 
true;
+
     protected final ICloudClient cloudClient;
     protected final ICloudGuardian guardian;
     protected final IWriteBufferProvider writeBufferProvider;
@@ -376,7 +379,7 @@
     public final JsonNode listAsJson(ObjectMapper objectMapper) {
         ArrayNode objectsInfo = objectMapper.createArrayNode();
         try {
-            List<CloudFile> allFiles = list();
+            List<CloudFile> allFiles = list(NO_OP_LIST_FILES_FILTER);
             allFiles.sort((x, y) -> 
String.CASE_INSENSITIVE_ORDER.compare(x.getPath(), y.getPath()));
             for (CloudFile file : allFiles) {
                 ObjectNode objectInfo = objectsInfo.addObject();
@@ -393,7 +396,7 @@
         }
     }

-    private List<CloudFile> list() {
+    private List<CloudFile> list(Predicate<String> filter) {
         List<CloudFile> allFiles = new ArrayList<>();
         // get cached files (read from disk)
         for (IODeviceHandle deviceHandle : getIODevices()) {
@@ -409,7 +412,9 @@

             for (FileReference fileReference : deviceFiles) {
                 try {
-                    allFiles.add(CloudFile.of(fileReference.getRelativePath(), 
fileReference.getFile().length()));
+                    if (filter.test(fileReference.getRelativePath())) {
+                        
allFiles.add(CloudFile.of(fileReference.getRelativePath(), 
fileReference.getFile().length()));
+                    }
                 } catch (Throwable th) {
                     LOGGER.warn("Encountered issue for local storage file {}", 
fileReference.getRelativePath(), th);
                 }
@@ -418,7 +423,9 @@

         // get uncached files from uncached files tracker
         for (UncachedFileReference uncachedFile : getUncachedFiles()) {
-            allFiles.add(CloudFile.of(uncachedFile.getRelativePath(), 
uncachedFile.getSize()));
+            if (filter.test(uncachedFile.getRelativePath())) {
+                allFiles.add(CloudFile.of(uncachedFile.getRelativePath(), 
uncachedFile.getSize()));
+            }
         }
         return allFiles;
     }
@@ -468,10 +475,46 @@
     }

     public long getTotalRemoteStorageSizeForNodeBytes() {
-        long size = 0;
-        for (CloudFile file : list()) {
-            size += file.getSize();
+        return getResourceTotalSize(NO_OP_LIST_FILES_FILTER);
+    }
+
+    @Override
+    public long getCollectionSize(String fqn) {
+        return getResourceTotalSize(path -> path.contains(fqn));
+    }
+
+    private long getResourceTotalSize(Predicate<String> relativePathFilter) {
+        long totalSize = 0;
+
+        // get cached files (read from disk)
+        for (IODeviceHandle deviceHandle : getIODevices()) {
+            FileReference storageRoot = 
deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME);
+
+            Set<FileReference> deviceFiles;
+            try {
+                deviceFiles = localIoManager.list(storageRoot, 
IoUtil.NO_OP_FILTER);
+            } catch (Throwable th) {
+                LOGGER.warn("Failed to get local storage files for root {}", 
storageRoot.getRelativePath(), th);
+                continue;
+            }
+
+            for (FileReference fileReference : deviceFiles) {
+                try {
+                    if 
(relativePathFilter.test(fileReference.getRelativePath())) {
+                        totalSize += fileReference.getFile().length();
+                    }
+                } catch (Throwable th) {
+                    LOGGER.warn("Encountered issue for local storage file {}", 
fileReference.getRelativePath(), th);
+                }
+            }
         }
-        return size;
+
+        // get uncached files from uncached files tracker
+        for (UncachedFileReference uncachedFile : getUncachedFiles()) {
+            if (relativePathFilter.test(uncachedFile.getRelativePath())) {
+                totalSize += uncachedFile.getSize();
+            }
+        }
+        return totalSize;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index a6520c6..beca30d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -156,6 +156,14 @@
     long getSize(FileReference fileReference) throws HyracksDataException;
 
     /**
+     * Gets the size of the provided collection
+     *
+     * @param fqn fully qualified name of the collection
+     * @return resource size
+     */
+    long getCollectionSize(String fqn) throws HyracksDataException;
+
+    /**
      * Returns a new write channel
      *
      * @param fileHandle handle of the opened file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b35111e..aac66ea 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -601,6 +601,13 @@
         ((AbstractBulkOperation) bulkOperation).performOperation();
     }

+    @Override
+    public long getCollectionSize(String fqn) {
+        // TODO(htowaileb): this should have similar implementation to 
AbstractCloudIOManager#getCollectionSize
+        // with minor modification, but return 0 for now.
+        return 0;
+    }
+
     public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
         this.spaceMaker = spaceMaker;
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18711
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I02fb8ff0a87ff272aba05a676ccfb46ac370ebd2
Gerrit-Change-Number: 18711
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange

Reply via email to