This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new b6b5cc4 Implementing generic resource handler and metadata ingestion
b6b5cc4 is described below
commit b6b5cc43be4931b427958e2863d6865b45b5c6ca
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Wed Mar 24 13:03:53 2021 -0400
Implementing generic resource handler and metadata ingestion
---
.../drms/api/handlers/ResourceServiceHandler.java | 99 +++++++++++++++++++++-
.../handlers/StoragePreferenceServiceHandler.java | 6 +-
.../drms/api/handlers/StorageServiceHandler.java | 2 +
.../drms/core/constants/ResourceConstants.java | 21 +++++
.../deserializer/GenericResourceDeserializer.java | 89 +++++++++++++++++++
.../main/proto/resource/DRMSResourceService.proto | 35 +++++++-
6 files changed, 247 insertions(+), 5 deletions(-)
diff --git
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 4e34e7a..34efc05 100644
---
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -17,15 +17,78 @@
package org.apache.airavata.drms.api.handlers;
import com.google.protobuf.Empty;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.groups.FetchCurrentUserRequest;
+import org.apache.airavata.datalake.drms.groups.FetchCurrentUserResponse;
+import org.apache.airavata.datalake.drms.groups.GroupServiceGrpc;
+import org.apache.airavata.datalake.drms.groups.User;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.core.constants.ResourceConstants;
+import org.apache.airavata.drms.core.constants.StorageConstants;
+import
org.apache.airavata.drms.core.deserializer.AnyStoragePreferenceDeserializer;
+import org.apache.airavata.drms.core.deserializer.GenericResourceDeserializer;
import org.lognet.springboot.grpc.GRpcService;
+import org.neo4j.driver.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
@GRpcService
public class ResourceServiceHandler extends
ResourceServiceGrpc.ResourceServiceImplBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ResourceServiceHandler.class);
+
+ @Autowired
+ private Neo4JConnector neo4JConnector;
+
+
@org.springframework.beans.factory.annotation.Value("${group.service.host}")
+ private String groupServiceHost;
+
+
@org.springframework.beans.factory.annotation.Value("${group.service.port}")
+ private int groupServicePort;
+
+ private User getUser(DRMSServiceAuthToken authToken) {
+ ManagedChannel channel =
ManagedChannelBuilder.forAddress(groupServiceHost,
groupServicePort).usePlaintext().build();
+ GroupServiceGrpc.GroupServiceBlockingStub groupClient =
GroupServiceGrpc.newBlockingStub(channel);
+ FetchCurrentUserResponse userResponse = groupClient.fetchCurrentUser(
+
FetchCurrentUserRequest.newBuilder().setAuthToken(authToken).build());
+ return userResponse.getUser();
+ }
+
@Override
public void fetchResource(ResourceFetchRequest request,
StreamObserver<ResourceFetchResponse> responseObserver) {
- super.fetchResource(request, responseObserver);
+ User callUser = getUser(request.getAuthToken());
+
+ // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
+ List<Record> records = this.neo4JConnector.searchNodes(
+ "MATCH
(u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource),
" +
+
"(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
+ "where res.resourceId = '" + request.getResourceId() +
"' and u.userId = '"
+ + callUser.getUserId() + "' return distinct res, sp,
s");
+
+ if (!records.isEmpty()) {
+ try {
+ List<GenericResource> genericResourceList =
GenericResourceDeserializer.deserializeList(records);
+
responseObserver.onNext(ResourceFetchResponse.newBuilder().setResource(genericResourceList.get(0)).build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+
+ logger.error("Errored while fetching resource with id {}",
request.getResourceId(), e);
+ responseObserver.onError(new Exception("Errored while fetching
resource with id "
+ + request.getResourceId() + ". Msg " +
e.getMessage()));
+ }
+ } else {
+ logger.error("Could not find a generic resource with id {}",
request.getResourceId());
+ responseObserver.onError(new Exception("Could not find a generic
resource with id "
+ + request.getResourceId()));
+ }
}
@Override
@@ -45,6 +108,38 @@ public class ResourceServiceHandler extends
ResourceServiceGrpc.ResourceServiceI
@Override
public void searchResource(ResourceSearchRequest request,
StreamObserver<ResourceSearchResponse> responseObserver) {
- super.searchResource(request, responseObserver);
+ User callUser = getUser(request.getAuthToken());
+
+ // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
+ List<Record> records = this.neo4JConnector.searchNodes(
+ "MATCH
(u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource),
" +
+
"(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
+ "where u.userId = '" + callUser.getUserId() + "'
return distinct res, sp, s");
+ try {
+ List<GenericResource> genericResourceList =
GenericResourceDeserializer.deserializeList(records);
+ ResourceSearchResponse.Builder builder =
ResourceSearchResponse.newBuilder();
+ builder.addAllResources(genericResourceList);
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+
+ } catch (Exception e) {
+ logger.error("Errored while searching generic resources; Message:
{}", e.getMessage(), e);
+ responseObserver.onError(e);
+ }
+ }
+
+ @Override
+ public void addResourceMetadata(AddResourceMetadataRequest request,
StreamObserver<Empty> responseObserver) {
+ User callUser = getUser(request.getAuthToken());
+
this.neo4JConnector.createMetadataNode(ResourceConstants.RESOURCE_LABEL,
"resourceId",
+ request.getResourceId(), callUser.getUserId(),
+ request.getMetadata().getKey(),
request.getMetadata().getValue());
+ responseObserver.onNext(Empty.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void fetchResourceMetadata(FetchResourceMetadataRequest request,
StreamObserver<FetchResourceMetadataResponse> responseObserver) {
+ super.fetchResourceMetadata(request, responseObserver);
}
}
diff --git
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 3d7b44a..ab560c3 100644
---
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -65,7 +65,8 @@ public class StoragePreferenceServiceHandler extends
StoragePreferenceServiceGrp
User callUser = getUser(request.getAuthToken());
List<Record> records = this.neo4JConnector.searchNodes(
- "MATCH
(u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)
" +
+ "MATCH
(u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference),
" +
+ "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp)
" +
"where sp.storagePreferenceId = '" +
request.getStoragePreferenceId() + "' and u.userId = '"
+ callUser.getUserId() + "' return distinct sp, s");
@@ -108,7 +109,8 @@ public class StoragePreferenceServiceHandler extends
StoragePreferenceServiceGrp
User callUser = getUser(request.getAuthToken());
List<Record> records = this.neo4JConnector.searchNodes(
- "MATCH
(u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)"
+
+ "MATCH
(u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference),
" +
+
"(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp)" +
" where u.userId ='" + callUser.getUserId() + "'
return distinct sp, s");
try {
List<AnyStoragePreference> storagePrefList =
AnyStoragePreferenceDeserializer.deserializeList(records);
diff --git
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
index a806e70..a00b7ca 100644
---
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
+++
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
@@ -113,6 +113,8 @@ public class StorageServiceHandler extends
StorageServiceGrpc.StorageServiceImpl
this.neo4JConnector.createMetadataNode(StorageConstants.STORAGE_LABEL,
"storageId",
request.getStorageId(), callUser.getUserId(),
request.getKey(), request.getValue());
+ responseObserver.onNext(Empty.getDefaultInstance());
+ responseObserver.onCompleted();
}
@Override
diff --git
a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
new file mode 100644
index 0000000..2c7e0e2
--- /dev/null
+++
b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.airavata.drms.core.constants;
+
+public class ResourceConstants {
+ public static final String RESOURCE_LABEL = "Resource";
+}
diff --git
a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
new file mode 100644
index 0000000..c4f4d97
--- /dev/null
+++
b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.drms.core.deserializer;
+
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStorage;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
+import org.apache.airavata.drms.core.constants.ResourceConstants;
+import org.apache.airavata.drms.core.constants.StorageConstants;
+import org.apache.airavata.drms.core.constants.StoragePreferenceConstants;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.types.Node;
+import org.springframework.beans.BeanWrapper;
+import org.springframework.beans.PropertyAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GenericResourceDeserializer {
+
+ public static List<GenericResource> deserializeList(List<Record>
neo4jRecords) throws Exception {
+ List<GenericResource> resourceList = new ArrayList<>();
+ for (Record record : neo4jRecords) {
+ InternalRecord internalRecord = (InternalRecord) record;
+ List<Value> values = internalRecord.values();
+ if (values.size() == 3) {
+ Value resourceValue = values.get(0);
+ Value prfValue = values.get(1);
+ Value stoValue = values.get(2);
+ Node resourceNode = resourceValue.asNode();
+ Node prefNode = prfValue.asNode();
+ Node stoNode = stoValue.asNode();
+ if (resourceNode.hasLabel(ResourceConstants.RESOURCE_LABEL) &&
+
prefNode.hasLabel(StoragePreferenceConstants.STORAGE_PREFERENCE_LABEL) &&
+ stoNode.hasLabel(StorageConstants.STORAGE_LABEL)) {
+
+ AnyStorage storage =
AnyStorageDeserializer.deriveStorageFromMap(stoNode.asMap());
+ AnyStoragePreference preference =
AnyStoragePreferenceDeserializer.deriveStoragePrefFromMap(
+ prefNode.asMap(), storage);
+ GenericResource genericResource =
deriveGenericResourceFromMap(resourceNode.asMap(), preference);
+ resourceList.add(genericResource);
+ }
+ }
+ }
+ return resourceList;
+ }
+
+ public static GenericResource deriveGenericResourceFromMap(Map<String,
Object> fixedMap,
+
AnyStoragePreference preference) throws Exception {
+
+ GenericResource.Builder genericResourceBuilder =
GenericResource.newBuilder();
+ setObjectFieldsUsingMap(genericResourceBuilder, fixedMap);
+ switch (preference.getStorageCase()){
+ case S3STORAGEPREFERENCE:
+
genericResourceBuilder.setS3Preference(preference.getS3StoragePreference());
+ break;
+ case SSHSTORAGEPREFERENCE:
+
genericResourceBuilder.setSshPreference(preference.getSshStoragePreference());
+ break;
+ }
+
+ return genericResourceBuilder.build();
+ }
+
+ private static void setObjectFieldsUsingMap(Object target, Map<String,
Object> values) {
+ for (String field :values.keySet()) {
+ BeanWrapper beanWrapper =
PropertyAccessorFactory.forBeanPropertyAccess(target);
+ beanWrapper.setPropertyValue(field, values.get(field));
+ }
+ }
+}
diff --git
a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
index 600974a..7251f12 100644
---
a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
+++
b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
@@ -72,9 +72,30 @@ message ResourceSearchRequest {
message ResourceSearchResponse {
org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
- repeated org.apache.airavata.datalake.drms.resource.GenericResource
storagesPreference = 2;
+ repeated org.apache.airavata.datalake.drms.resource.GenericResource
resources = 2;
}
+message Metadata {
+ string key = 1;
+ string value = 2;
+}
+
+message AddResourceMetadataRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
+ string resourceId = 2;
+ Metadata metadata = 3;
+}
+
+message FetchResourceMetadataRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
+ string resourceId = 2;
+}
+
+message FetchResourceMetadataResponse {
+ repeated Metadata metadata = 1;
+}
+
+
service ResourceService {
rpc fetchResource (ResourceFetchRequest) returns (ResourceFetchResponse) {
@@ -106,4 +127,16 @@ service ResourceService {
post: "/v1.0/api/drms/resource/searchPreference"
};
}
+
+ rpc addResourceMetadata (AddResourceMetadataRequest) returns
(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/v1.0/api/drms/resource/metadata"
+ };
+ }
+
+ rpc fetchResourceMetadata (FetchResourceMetadataRequest) returns
(FetchResourceMetadataResponse) {
+ option (google.api.http) = {
+ get: "/v1.0/api/drms/resource/metadata"
+ };
+ }
}
\ No newline at end of file