[ 
https://issues.apache.org/jira/browse/GIRAPH-1188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444624#comment-16444624
 ] 

ASF GitHub Bot commented on GIRAPH-1188:
----------------------------------------

Github user yukselakinci commented on a diff in the pull request:

    https://github.com/apache/giraph/pull/70#discussion_r182850420
  
    --- Diff: 
giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java
 ---
    @@ -0,0 +1,308 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.giraph.writable.kryo;
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.KryoException;
    +import com.esotericsoftware.kryo.Registration;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import com.esotericsoftware.kryo.util.DefaultClassResolver;
    +import com.esotericsoftware.kryo.util.ObjectMap;
    +import org.apache.giraph.zk.ZooKeeperExt;
    +import org.apache.zookeeper.CreateMode;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.ZooDefs;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.List;
    +
    +import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
    +
    +/**
    + * This class resolver assigns unique classIds for every class that was not
    + * explicitly registered. It uses zookeeper for consistent mapping across 
all
    + * nodes.
    + */
    +public class GiraphClassResolver extends DefaultClassResolver {
    +  /** Length of the ZK sequence number */
    +  private static final int SEQUENCE_NUMBER_LENGTH = 10;
    +  /** Base ID to start for class name assignments.
    +   * This number has to be high enough to not conflict with
    +   * explicity registered class IDs.
    +   * */
    +  private static final int BASE_CLASS_ID = 10000;
    +
    +  /** Class name to ID cache */
    +  private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
    +  /** ID to class name cache */
    +  private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
    +  /** Zookeeper */
    +  private static ZooKeeperExt ZK;
    +  /** Zookeeper path for automatic class registrations */
    +  private static String KRYO_REGISTERED_CLASS_PATH;
    +  /** Minimum class ID assigned by zookeeper sequencing */
    +  private static int MIN_CLASS_ID = -1;
    +
    +  /** Memoized class id*/
    +  private int memoizedClassId = -1;
    +  /** Memoized class registration */
    +  private Registration memoizedClassIdValue;
    +
    +  /**
    +   * Sets zookeeper informaton.
    +   * @param zookeeperExt ZookeeperExt
    +   * @param kryoClassPath Zookeeper directory path where class Name-ID
    +   *                      mapping is stored.
    +   */
    +  public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
    +                                      String kryoClassPath) {
    +    ZK = zookeeperExt;
    +    KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
    +  }
    +
    +  /**
    +   * Creates a new node for the given class name.
    +   * Creation mode is persistent sequential, i.e.
    +   * ZK will always create a new node . There could be
    +   * multiple entries for the same class name but since
    +   * the lowest index is used, this is not a problem.
    +   * @param className Class name
    +   */
    +  public static void createClassName(String className) {
    +    try {
    +      String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
    +      ZK.createExt(path,
    +              null,
    +              ZooDefs.Ids.OPEN_ACL_UNSAFE,
    +              CreateMode.PERSISTENT_SEQUENTIAL,
    +              true);
    +    } catch (KeeperException e) {
    +      throw new IllegalStateException(
    +              "Failed to create class " + className, e);
    +    } catch (InterruptedException e) {
    +      throw new IllegalStateException(
    +              "Interrupted while creating " + className, e);
    +    }
    +  }
    +
    +  /**
    +   * Refreshes class-ID mapping from zookeeper.
    +   * Not thread safe.
    +   */
    +  public static void refreshCache() {
    +    try {
    +      ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
    +              null,
    +              ZooDefs.Ids.OPEN_ACL_UNSAFE,
    +              CreateMode.PERSISTENT,
    +              true);
    +    } catch (KeeperException e) {
    +      throw new IllegalStateException(
    +              "Failed to refresh kryo cache " +
    +                      KRYO_REGISTERED_CLASS_PATH, e);
    +    } catch (InterruptedException e) {
    +      throw new IllegalStateException(
    +              "Interrupted while refreshing kryo cache " +
    +                      KRYO_REGISTERED_CLASS_PATH, e);
    +    }
    +
    +    List<String> registeredList;
    +    try {
    +      registeredList =
    +              ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
    +                      false,
    +                      true,
    +                      false);
    +    } catch (KeeperException e) {
    +      throw new IllegalStateException(
    +        "Failed to retrieve child nodes for " + 
KRYO_REGISTERED_CLASS_PATH, e);
    +    } catch (InterruptedException e) {
    +      throw new IllegalStateException(
    +        "Interrupted while retrieving child nodes for " +
    +                KRYO_REGISTERED_CLASS_PATH, e);
    +    }
    +
    +    for (String name : registeredList) {
    +      String className = name.substring(0,
    +              name.length() - SEQUENCE_NUMBER_LENGTH);
    +      int classId = Integer.parseInt(
    +              name.substring(name.length() - SEQUENCE_NUMBER_LENGTH));
    +
    +      if (MIN_CLASS_ID == -1) {
    +        MIN_CLASS_ID = classId;
    +      }
    +
    +      int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
    +      if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
    +        ID_TO_CLASS_NAME.put(adjustedId, className);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Gets ID for the given class name.
    +   * @param className Class name
    +   * @return class id Class ID
    +   */
    +  public static int getClassId(String className) {
    +    if (CLASS_NAME_TO_ID.containsKey(className)) {
    +      return CLASS_NAME_TO_ID.get(className);
    +    }
    +    synchronized (GiraphClassResolver.class) {
    +      if (CLASS_NAME_TO_ID.containsKey(className)) {
    +        return CLASS_NAME_TO_ID.get(className);
    +      }
    +      refreshCache();
    +
    +      if (!CLASS_NAME_TO_ID.containsKey(className)) {
    +        createClassName(className);
    +        refreshCache();
    +      }
    +    }
    +
    +    if (!CLASS_NAME_TO_ID.containsKey(className)) {
    +      throw new IllegalStateException("Failed to assigned id to " + 
className);
    +    }
    +
    +    return CLASS_NAME_TO_ID.get(className);
    +  }
    +
    +  /**
    +   * Get class name for given ID.
    +   * @param id class ID
    +   * @return class name
    +   */
    +  public static String getClassName(int id) {
    +    if (ID_TO_CLASS_NAME.containsKey(id)) {
    +      return ID_TO_CLASS_NAME.get(id);
    +    }
    +    synchronized (GiraphClassResolver.class) {
    +      if (ID_TO_CLASS_NAME.containsKey(id)) {
    +        return ID_TO_CLASS_NAME.get(id);
    +      }
    +      refreshCache();
    +    }
    +
    +    if (!ID_TO_CLASS_NAME.containsKey(id)) {
    +      throw new IllegalStateException("ID " + id + " doesn't exist");
    +    }
    +    return ID_TO_CLASS_NAME.get(id);
    +  }
    +
    +  @Override
    +  public Registration register(Registration registration) {
    --- End diff --
    
    The default implementation also handles the case when the class is 
non-explicitly registered, i.e. it is not assigned to an an integer. Since in 
this implementation all classes are assigned to integers agreed by all kryo 
instances, they all treated like explicitly assigned classes. This 
implementation throws an exception if it encounters a class that has not been 
assigned to an integer yet.  


> Add kryo custom class resolver for faster serialization.
> --------------------------------------------------------
>
>                 Key: GIRAPH-1188
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-1188
>             Project: Giraph
>          Issue Type: Improvement
>            Reporter: Yuksel Akinci
>            Priority: Major
>
> The custom class resolver allows kryo to always use the integer IDs for 
> classes that are agreed by all nodes running the job. Default class resolver 
> always writes the full class name of  the first encountered class type to the 
> stream, and then it assigns an integer for subsequent instances. These 
> changes make the serialization faster by eliminating the need to write the 
> full name for the first encountered class instance. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to