dongpodu opened a new issue, #2244:
URL: https://github.com/apache/incubator-hugegraph/issues/2244

   ### Bug Type (问题类型)
   
   other exception / error (其他异常报错)
   
   ### Before submit
   
   - [X] 我已经确认现有的 [Issues](https://github.com/apache/hugegraph/issues) 与 
[FAQ](https://hugegraph.apache.org/docs/guides/faq/) 中没有相同 / 重复问题 (I have 
confirmed and searched that there are no similar problems in the historical 
issue and documents)
   
   ### Environment (环境信息)
   
   - Server Version: 0.11.2 (Apache Release Version)
   - Backend: Cassandra
   - OS: 64 CPUs, 50 G RAM, Debian 9.8 
   - Data Size:  xx vertices, xx edges <!-- (like 1000W 点, 9000W 边) -->
   
   
   ### Expected & Actual behavior (期望与实际表现)
   
   在0.11.2的基础上新增了两个接口:
   1、@POST
       @Timed(name = "batch-create")
       @DecompressInterceptor.Decompress
       @Path("batch")
       @StatusFilter.Status(StatusFilter.Status.CREATED)
       @Consumes(APPLICATION_JSON)
       @Produces(APPLICATION_JSON_WITH_CHARSET)
       @RolesAllowed({"admin", "$owner=$graph $action=vertex_write"})
       public String create(@Context HugeConfig config,
                            @Context GraphManager manager,
                            @PathParam("graph") String graph,
                            @QueryParam("skip_error") @DefaultValue("true") 
boolean skipError,
                            String body) {
           JsonLine[] values = JsonUtil.fromJson(body, JsonLine[].class);
           HugeGraph g = graph(manager, graph);
           // 对点去重,避免重复导入
           Map<Id, JsonVertex> vertexs = new HashMap<>();
           for (JsonLine line : values) {
               line.in.id = getVertexId(g, line.in);
               vertexs.put((Id) line.in.id, line.in);
               line.out.id = getVertexId(g, line.out);
               vertexs.put((Id) line.out.id, line.out);
           }
           List<String> errors = Lists.newLinkedList();
           // 点结果
           Map<Id, Vertex> setIds = new HashMap<>();
           this.commit(config, g, vertexs.size(), () -> {
               for (JsonVertex vertex : vertexs.values()) {
                   try{
                       setIds.put((Id) vertex.id, 
g.addVertex(vertex.properties()));
                   }catch (Exception e){
                       LOG.error("create vertex error, vertex:{}", 
JsonUtil.toJson(vertex), e);
                       String message = "vertex:" + ((Id) vertex.id).asString() 
+ " create error:"+e.getMessage();
                       errors.add(message);
                       if(!skipError) {
                           throw new RuntimeException(message);
                       }
                   }
               }
               return null;
           });
           // 后创建边
           List<Id> edges = this.commit(config, g, values.length, () -> {
               List<Id> ids = new ArrayList<>(values.length);
               for (JsonLine line : values) {
                   try{
                       Vertex srcVertex = setIds.get(line.in.id);
                       if(null == srcVertex){
                           throw new RuntimeException(line.in.id+" not found");
                       }
                       Vertex tgtVertex = setIds.get(line.out.id);
                       if(null == tgtVertex){
                           throw new RuntimeException(line.out.id+" not found");
                       }
                       Edge edge = srcVertex.addEdge(line.label, tgtVertex, 
line.properties());
                       ids.add((Id) edge.id());
                   }catch (Exception e){
                       LOG.error("create edge error, edge:{}", 
JsonUtil.toJson(line), e);
                       String message = "edge create error:" + e.getMessage();
                       errors.add(message);
                       if(!skipError) {
                           throw new RuntimeException(message);
                       }
                   }
               }
               return ids;
           });
           Map<String, Object> result = Maps.newHashMap();
           result.put("errors", errors);
           return JsonSerializer.instance().writeMap(result);
       }
   
   2、@POST
       @Timed
       @Consumes(APPLICATION_JSON)
       @Produces(APPLICATION_JSON_WITH_CHARSET)
       public String post(@Context GraphManager manager,
                          @PathParam("graph") String graph, Request request) 
throws Exception {
           E.checkArgumentNotNull(request, "The request body can't be null");
           E.checkArgument(request.step != null, "The steps of request can't be 
null");
           E.checkArgument(CollectionUtils.isNotEmpty(request.sources), "The 
sources of request can't be null");
           long startTime = System.currentTimeMillis();
           HugeGraph g=null;
           try{
               g = graph(manager, graph);
               EdgeStep step = new EdgeStep(g, request.step.direction, 
request.step.labels,
                       request.step.properties, request.step.vertexProperties,
                       request.step.degree, request.step.skipDegree);
   
               Tuple3<Set<HugeNewTraverser.Node>, Map<Id, Integer>, Set<Edge>> 
tuple3;
               try (KneighborNewTraverser traverser = new 
KneighborNewTraverser(g)) {
                   Set<Id> sourceIds = request.sources.stream()
                           .map(HugeVertex::getIdValue)
                           .filter(Objects::nonNull)
                           .collect(Collectors.toSet());
                   tuple3 = traverser.customizedKneighborV3(sourceIds, step,
                           request.maxDepth,
                           request.limit);
               }finally {
                   long endTime = System.currentTimeMillis();
                   log.info("kneighbor_v3 query cost: {}", endTime - startTime);
               }
   
               Map<String, Map> vertexMap = new ConcurrentHashMap();
               HugeGraph finalG = g;
               long s = System.currentTimeMillis();
               tuple3.getFirst().parallelStream().forEach(node -> {
                   try{
                       HugeVertex vertex = (HugeVertex) 
finalG.vertex(node.id());
                       Set<String> keys = 
request.vertexProperty.get(vertex.label());
                       if(CollectionUtils.isEmpty(keys)){
                           Map map = 
vertex.getProperties().values().stream().collect(Collectors.toMap(HugeProperty::key,HugeProperty::value));
                           vertexMap.put(vertex.id().toString(),
                                   Maps.of("id", vertex.id(),
                                           "label", vertex.label(),
                                           "type", 
vertex.type().name().toLowerCase(),
                                           "depth", 
tuple3.getSecond().get(vertex.id()),
                                           "properties", map));
                       }else{
                           Map map = new HashMap();
                           keys.stream().filter(e->e.indexOf("properties") > -1)
                                   .map(e->e.substring(11)).forEach(k -> 
map.put(k, vertex.value(k)));
                           Map vmap = new HashMap();
                           if(keys.contains("id")){
                               vmap.put("id", vertex.id());
                           }
                           if(keys.contains("label")){
                               vmap.put("label", vertex.label());
                           }
                           if(keys.contains("type")){
                               vmap.put("type", 
vertex.type().name().toLowerCase());
                           }
                           if(keys.contains("depth")){
                               vmap.put("depth", 
tuple3.getSecond().get(vertex.id()));
                           }
                           vmap.put("properties", map);
                           vertexMap.put(vertex.id().toString(), vmap);
                       }
                   }catch (Exception e){
                       log.error("构建点异常, node:{}", node, e);
                   }
               });
               long s1 = System.currentTimeMillis();
               log.info("s-s1 cost:{}ms", s1 - s);
               // 边
               Map<String, Map> edgeMap = new ConcurrentHashMap();
               tuple3.getThird().parallelStream().forEach(e -> {
                   HugeEdge eg = (HugeEdge) e;
                   Set<String> keys = request.edgeProperty.get(eg.label());
                   if(CollectionUtils.isEmpty(keys)){
                       Map map = 
eg.getProperties().values().stream().collect(Collectors.toMap(HugeProperty::key,HugeProperty::value));
                       edgeMap.put(e.id().toString(),
                               Maps.of("id", e.id(),
                                       "label", e.label(),
                                       "type", "edge",
                                       "outV", eg.outVertex().id(),
                                       "outVLabel", 
((HugeVertex)eg.outVertex()).type().name().toLowerCase(),
                                       "inV", eg.inVertex().id(),
                                       "inVLabel", 
((HugeVertex)eg.inVertex()).type().name().toLowerCase(),
                                       "properties", map)
                       );
                   }else{
                       Map map = new HashMap();
                       keys.stream().filter(k->k.indexOf("properties") > -1)
                               .map(k->k.substring(11))
                               .forEach(k -> map.put(k, eg.value(k)));
                       Map vmap = new HashMap();
                       if(keys.contains("id")){
                           vmap.put("id", eg.id());
                       }
                       if(keys.contains("label")){
                           vmap.put("label", eg.label());
                       }
                       if(keys.contains("type")){
                           vmap.put("type", "edge");
                       }
                       if(keys.contains("outV")){
                           vmap.put("outV", eg.outVertex().id());
                       }
                       if(keys.contains("outVLabel")){
                           vmap.put("outVLabel", 
((HugeVertex)eg.outVertex()).type().name().toLowerCase());
                       }
                       if(keys.contains("inV")){
                           vmap.put("inV", eg.inVertex().id());
                       }
                       if(keys.contains("inVLabel")){
                           vmap.put("inVLabel", 
((HugeVertex)eg.inVertex()).type().name().toLowerCase());
                       }
                       vmap.put("properties", map);
                       edgeMap.put(e.id().toString(), vmap);
                   }
               });
               long s2 = System.currentTimeMillis();
               log.info("kneighbor_v3 s-s1:{}, s1-s2:{}ms", s1 - s, s2 - s1);
               return JsonSerializer.instance().writeMap(Maps.of("vertexs", 
vertexMap.values(), "edges", edgeMap.values()));
           }finally {
               long endTime = System.currentTimeMillis();
               log.info("kneighbor_v3 query cost: {}", endTime - startTime);
               if (g != null) {
                   g.tx().commit();
               }
           }
       }。
   客户端只会调这两个接口,在运行2天左右,会报com.datastax.driver.core.exceptions.BusyPoolException: 
[/10.157.40.45] Pool is busy (no available connection and timed out after 5000 
MILLISECONDS)。
   使用jmap查看session数,发现session一直在增长,请问这个问题该怎么排查?
   
   ### Vertex/Edge example (问题点 / 边数据举例)
   
   _No response_
   
   ### Schema [VertexLabel, EdgeLabel, IndexLabel] (元数据结构)
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to