[ 
https://issues.apache.org/jira/browse/TAJO-269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14260935#comment-14260935
 ] 

ASF GitHub Bot commented on TAJO-269:
-------------------------------------

Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/322#discussion_r22342445
  
    --- Diff: 
tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
 ---
    @@ -0,0 +1,678 @@
    +/*
    + * Lisensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.plan.serder;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.tajo.OverridableConf;
    +import org.apache.tajo.algebra.JoinType;
    +import org.apache.tajo.catalog.Column;
    +import org.apache.tajo.catalog.Schema;
    +import org.apache.tajo.catalog.SortSpec;
    +import org.apache.tajo.catalog.TableDesc;
    +import org.apache.tajo.catalog.partition.PartitionMethodDesc;
    +import org.apache.tajo.catalog.proto.CatalogProtos;
    +import org.apache.tajo.exception.UnimplementedException;
    +import org.apache.tajo.plan.Target;
    +import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
    +import org.apache.tajo.plan.expr.EvalNode;
    +import org.apache.tajo.plan.expr.FieldEval;
    +import org.apache.tajo.plan.expr.WindowFunctionEval;
    +import org.apache.tajo.plan.logical.*;
    +import org.apache.tajo.util.KeyValueSet;
    +import org.apache.tajo.util.TUtil;
    +
    +import java.util.*;
    +
    +/**
    + * It deserializes a list of serialized logical nodes into a logical node 
tree.
    + */
    +public class LogicalNodeDeserializer {
    +  private static final LogicalNodeDeserializer instance;
    +
    +  static {
    +    instance = new LogicalNodeDeserializer();
    +  }
    +
    +  /**
    +   * Deserialize a list of nodes into a logical node tree.
    +   *
    +   * @param context QueryContext
    +   * @param tree LogicalNodeTree which contains a list of serialized 
logical nodes.
    +   * @return A logical node tree
    +   */
    +  public static LogicalNode deserialize(OverridableConf context, 
PlanProto.LogicalNodeTree tree) {
    +    Map<Integer, LogicalNode> nodeMap = Maps.newHashMap();
    +
    +    // sort serialized logical nodes in an ascending order of their sids
    +    List<PlanProto.LogicalNode> nodeList = 
Lists.newArrayList(tree.getNodesList());
    +    Collections.sort(nodeList, new Comparator<PlanProto.LogicalNode>() {
    +      @Override
    +      public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode 
o2) {
    +        return o1.getSid() - o2.getSid();
    +      }
    +    });
    +
    +    LogicalNode current = null;
    +
    +    // The sorted order is the same of a postfix traverse order.
    +    // So, it sequentially transforms each serialized node into a 
LogicalNode instance in a postfix order of
    +    // the original logical node tree.
    +
    +    Iterator<PlanProto.LogicalNode> it = nodeList.iterator();
    +    while (it.hasNext()) {
    +      PlanProto.LogicalNode protoNode = it.next();
    +
    +      switch (protoNode.getType()) {
    +      case ROOT:
    +        current = convertRoot(nodeMap, protoNode);
    +        break;
    +      case SET_SESSION:
    +        current = convertSetSession(protoNode);
    +        break;
    +      case EXPRS:
    +        current = convertEvalExpr(context, protoNode);
    +        break;
    +      case PROJECTION:
    +        current = convertProjection(context, nodeMap, protoNode);
    +        break;
    +      case LIMIT:
    +        current = convertLimit(nodeMap, protoNode);
    +        break;
    +      case SORT:
    +        current = convertSort(nodeMap, protoNode);
    +        break;
    +      case WINDOW_AGG:
    +        current = convertWindowAgg(context, nodeMap, protoNode);
    +        break;
    +      case HAVING:
    +        current = convertHaving(context, nodeMap, protoNode);
    +        break;
    +      case GROUP_BY:
    +        current = convertGroupby(context, nodeMap, protoNode);
    +        break;
    +      case DISTINCT_GROUP_BY:
    +        current = convertDistinctGroupby(context, nodeMap, protoNode);
    +        break;
    +      case SELECTION:
    +        current = convertFilter(context, nodeMap, protoNode);
    +        break;
    +      case JOIN:
    +        current = convertJoin(context, nodeMap, protoNode);
    +        break;
    +      case TABLE_SUBQUERY:
    +        current = convertTableSubQuery(context, nodeMap, protoNode);
    +        break;
    +      case UNION:
    +        current = convertUnion(nodeMap, protoNode);
    +        break;
    +      case PARTITIONS_SCAN:
    +        current = convertPartitionScan(context, protoNode);
    +        break;
    +      case SCAN:
    +        current = convertScan(context, protoNode);
    +        break;
    +
    +      case CREATE_TABLE:
    +        current = convertCreateTable(nodeMap, protoNode);
    +        break;
    +      case INSERT:
    +        current = convertInsert(nodeMap, protoNode);
    +        break;
    +      case DROP_TABLE:
    +        current = convertDropTable(protoNode);
    +        break;
    +
    +      case CREATE_DATABASE:
    +        current = convertCreateDatabase(protoNode);
    +        break;
    +      case DROP_DATABASE:
    +        current = convertDropDatabase(protoNode);
    +        break;
    +
    +      case ALTER_TABLESPACE:
    +        current = convertAlterTablespace(protoNode);
    +        break;
    +      case ALTER_TABLE:
    +        current = convertAlterTable(protoNode);
    +        break;
    +      case TRUNCATE_TABLE:
    +        current = convertTruncateTable(protoNode);
    +        break;
    +
    +      default:
    +        throw new RuntimeException("Unknown NodeType: " + 
protoNode.getType().name());
    +      }
    +
    +      nodeMap.put(protoNode.getSid(), current);
    +    }
    +
    +    return current;
    +  }
    +
    +  private static LogicalRootNode convertRoot(Map<Integer, LogicalNode> 
nodeMap,
    +                                            PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.RootNode rootProto = protoNode.getRoot();
    +
    +    LogicalRootNode root = new LogicalRootNode(protoNode.getPid());
    +    root.setChild(nodeMap.get(rootProto.getChildId()));
    +    if (protoNode.hasInSchema()) {
    +      root.setInSchema(convertSchema(protoNode.getInSchema()));
    +    }
    +    if (protoNode.hasOutSchema()) {
    +      root.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    }
    +
    +    return root;
    +  }
    +
    +  private static SetSessionNode convertSetSession(PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.SetSessionNode setSessionProto = protoNode.getSetSession();
    +
    +    SetSessionNode setSession = new SetSessionNode(protoNode.getPid());
    +    setSession.init(setSessionProto.getName(), setSessionProto.hasValue() 
? setSessionProto.getValue() : null);
    +
    +    return setSession;
    +  }
    +
    +  private static EvalExprNode convertEvalExpr(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.EvalExprNode evalExprProto = protoNode.getExprEval();
    +
    +    EvalExprNode evalExpr = new EvalExprNode(protoNode.getPid());
    +    evalExpr.setInSchema(convertSchema(protoNode.getInSchema()));
    +    evalExpr.setTargets(convertTargets(context, 
evalExprProto.getTargetsList()));
    +
    +    return evalExpr;
    +  }
    +
    +  private static ProjectionNode convertProjection(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                                 PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.ProjectionNode projectionProto = protoNode.getProjection();
    +
    +    ProjectionNode projectionNode = new ProjectionNode(protoNode.getPid());
    +    projectionNode.init(projectionProto.getDistinct(), 
convertTargets(context, projectionProto.getTargetsList()));
    +    projectionNode.setChild(nodeMap.get(projectionProto.getChildId()));
    +    projectionNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    projectionNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return projectionNode;
    +  }
    +
    +  private static LimitNode convertLimit(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.LimitNode limitProto = protoNode.getLimit();
    +
    +    LimitNode limitNode = new LimitNode(protoNode.getPid());
    +    limitNode.setChild(nodeMap.get(limitProto.getChildId()));
    +    limitNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    limitNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    limitNode.setFetchFirst(limitProto.getFetchFirstNum());
    +
    +    return limitNode;
    +  }
    +
    +  private static SortNode convertSort(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.SortNode sortProto = protoNode.getSort();
    +
    +    SortNode sortNode = new SortNode(protoNode.getPid());
    +    sortNode.setChild(nodeMap.get(sortProto.getChildId()));
    +    sortNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    sortNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    sortNode.setSortSpecs(convertSortSpecs(sortProto.getSortSpecsList()));
    +
    +    return sortNode;
    +  }
    +
    +  private static HavingNode convertHaving(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                         PlanProto.LogicalNode protoNode) {
    +    PlanProto.FilterNode havingProto = protoNode.getFilter();
    +
    +    HavingNode having = new HavingNode(protoNode.getPid());
    +    having.setChild(nodeMap.get(havingProto.getChildId()));
    +    having.setQual(EvalNodeDeserializer.deserialize(context, 
havingProto.getQual()));
    +    having.setInSchema(convertSchema(protoNode.getInSchema()));
    +    having.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return having;
    +  }
    +
    +  private static WindowAggNode convertWindowAgg(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                               PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.WindowAggNode windowAggProto = protoNode.getWindowAgg();
    +
    +    WindowAggNode windowAgg = new WindowAggNode(protoNode.getPid());
    +    windowAgg.setChild(nodeMap.get(windowAggProto.getChildId()));
    +
    +    if (windowAggProto.getPartitionKeysCount() > 0) {
    +      
windowAgg.setPartitionKeys(convertColumns(windowAggProto.getPartitionKeysList()));
    +    }
    +
    +    if (windowAggProto.getWindowFunctionsCount() > 0) {
    +      windowAgg.setWindowFunctions(convertWindowFunccEvals(context, 
windowAggProto.getWindowFunctionsList()));
    +    }
    +
    +    windowAgg.setDistinct(windowAggProto.getDistinct());
    +
    +    if (windowAggProto.getSortSpecsCount() > 0) {
    +      
windowAgg.setSortSpecs(convertSortSpecs(windowAggProto.getSortSpecsList()));
    +    }
    +
    +    if (windowAggProto.getTargetsCount() > 0) {
    +      windowAgg.setTargets(convertTargets(context, 
windowAggProto.getTargetsList()));
    +    }
    +
    +    windowAgg.setInSchema(convertSchema(protoNode.getInSchema()));
    +    windowAgg.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return windowAgg;
    +  }
    +
    +  private static GroupbyNode convertGroupby(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                           PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.GroupbyNode groupbyProto = protoNode.getGroupby();
    +
    +    GroupbyNode groupby = new GroupbyNode(protoNode.getPid());
    +    groupby.setChild(nodeMap.get(groupbyProto.getChildId()));
    +    groupby.setDistinct(groupbyProto.getDistinct());
    +
    +    if (groupbyProto.getGroupingKeysCount() > 0) {
    +      
groupby.setGroupingColumns(convertColumns(groupbyProto.getGroupingKeysList()));
    +    }
    +    if (groupbyProto.getAggFunctionsCount() > 0) {
    +      groupby.setAggFunctions(convertAggFuncCallEvals(context, 
groupbyProto.getAggFunctionsList()));
    +    }
    +    if (groupbyProto.getTargetsCount() > 0) {
    +      groupby.setTargets(convertTargets(context, 
groupbyProto.getTargetsList()));
    +    }
    +
    +    groupby.setInSchema(convertSchema(protoNode.getInSchema()));
    +    groupby.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return groupby;
    +  }
    +
    +  private static DistinctGroupbyNode 
convertDistinctGroupby(OverridableConf context, Map<Integer, LogicalNode> 
nodeMap,
    +                                           PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.DistinctGroupbyNode distinctGroupbyProto = 
protoNode.getDistinctGroupby();
    +
    +    DistinctGroupbyNode distinctGroupby = new 
DistinctGroupbyNode(protoNode.getPid());
    +    
distinctGroupby.setChild(nodeMap.get(distinctGroupbyProto.getChildId()));
    +
    +    if (distinctGroupbyProto.hasGroupbyNode()) {
    +      distinctGroupby.setGroupbyPlan(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getGroupbyNode()));
    +    }
    +
    +    if (distinctGroupbyProto.getSubPlansCount() > 0) {
    +      List<GroupbyNode> subPlans = TUtil.newList();
    +      for (int i = 0; i < distinctGroupbyProto.getSubPlansCount(); i++) {
    +        subPlans.add(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getSubPlans(i)));
    +      }
    +      distinctGroupby.setSubPlans(subPlans);
    +    }
    +
    +    if (distinctGroupbyProto.getGroupingKeysCount() > 0) {
    +      
distinctGroupby.setGroupingColumns(convertColumns(distinctGroupbyProto.getGroupingKeysList()));
    +    }
    +    if (distinctGroupbyProto.getAggFunctionsCount() > 0) {
    +      distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, 
distinctGroupbyProto.getAggFunctionsList()));
    +    }
    +    if (distinctGroupbyProto.getTargetsCount() > 0) {
    +      distinctGroupby.setTargets(convertTargets(context, 
distinctGroupbyProto.getTargetsList()));
    +    }
    +    int [] resultColumnIds = new 
int[distinctGroupbyProto.getResultIdCount()];
    +    for (int i = 0; i < distinctGroupbyProto.getResultIdCount(); i++) {
    +      resultColumnIds[i] = distinctGroupbyProto.getResultId(i);
    +    }
    +    distinctGroupby.setResultColumnIds(resultColumnIds);
    +
    +    // TODO - in distinct groupby, output and target are not matched to 
each other. It does not follow the convention.
    +    distinctGroupby.setInSchema(convertSchema(protoNode.getInSchema()));
    +    distinctGroupby.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return distinctGroupby;
    +  }
    +
    +  private static JoinNode convertJoin(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                     PlanProto.LogicalNode protoNode) {
    +    PlanProto.JoinNode joinProto = protoNode.getJoin();
    +
    +    JoinNode join = new JoinNode(protoNode.getPid());
    +    join.setLeftChild(nodeMap.get(joinProto.getLeftChildId()));
    +    join.setRightChild(nodeMap.get(joinProto.getRightChildId()));
    +    join.setJoinType(convertJoinType(joinProto.getJoinType()));
    +    join.setInSchema(convertSchema(protoNode.getInSchema()));
    +    join.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    if (joinProto.hasJoinQual()) {
    +      join.setJoinQual(EvalNodeDeserializer.deserialize(context, 
joinProto.getJoinQual()));
    +    }
    +    if (joinProto.getExistsTargets()) {
    +      join.setTargets(convertTargets(context, joinProto.getTargetsList()));
    +    }
    +
    +    return join;
    +  }
    +
    +  private static SelectionNode convertFilter(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                            PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.FilterNode filterProto = protoNode.getFilter();
    +
    +    SelectionNode selection = new SelectionNode(protoNode.getPid());
    +    selection.setInSchema(convertSchema(protoNode.getInSchema()));
    +    selection.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    selection.setChild(nodeMap.get(filterProto.getChildId()));
    +    selection.setQual(EvalNodeDeserializer.deserialize(context, 
filterProto.getQual()));
    +
    +    return selection;
    +  }
    +
    +  private static UnionNode convertUnion(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.UnionNode unionProto = protoNode.getUnion();
    +
    +    UnionNode union = new UnionNode(protoNode.getPid());
    +    union.setInSchema(convertSchema(protoNode.getInSchema()));
    +    union.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    union.setLeftChild(nodeMap.get(unionProto.getLeftChildId()));
    +    union.setRightChild(nodeMap.get(unionProto.getRightChildId()));
    +
    +    return union;
    +  }
    +
    +  private static ScanNode convertScan(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
    +    ScanNode scan = new ScanNode(protoNode.getPid());
    +    fillScanNode(context, protoNode, scan);
    +
    +    return scan;
    +  }
    +
    +  private static void fillScanNode(OverridableConf context, 
PlanProto.LogicalNode protoNode, ScanNode scan) {
    +    PlanProto.ScanNode scanProto = protoNode.getScan();
    +    if (scanProto.hasAlias()) {
    +      scan.init(new TableDesc(scanProto.getTable()), scanProto.getAlias());
    +    } else {
    +      scan.init(new TableDesc(scanProto.getTable()));
    +    }
    +
    +    if (scanProto.getExistTargets()) {
    +      scan.setTargets(convertTargets(context, scanProto.getTargetsList()));
    +    }
    +
    +    if (scanProto.hasQual()) {
    +      scan.setQual(EvalNodeDeserializer.deserialize(context, 
scanProto.getQual()));
    +    }
    +
    +    scan.setInSchema(convertSchema(protoNode.getInSchema()));
    +    scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +  }
    +
    +  private static PartitionedTableScanNode 
convertPartitionScan(OverridableConf context, PlanProto.LogicalNode protoNode) {
    +    PartitionedTableScanNode partitionedScan = new 
PartitionedTableScanNode(protoNode.getPid());
    +    fillScanNode(context, protoNode, partitionedScan);
    +
    +    PlanProto.PartitionScanSpec partitionScanProto = 
protoNode.getPartitionScan();
    +    Path [] paths = new Path[partitionScanProto.getPathsCount()];
    +    for (int i = 0; i < partitionScanProto.getPathsCount(); i++) {
    +      paths[i] = new Path(partitionScanProto.getPaths(i));
    +    }
    +    partitionedScan.setInputPaths(paths);
    +    return partitionedScan;
    +  }
    +
    +  private static TableSubQueryNode convertTableSubQuery(OverridableConf 
context,
    +                                                                 
Map<Integer, LogicalNode> nodeMap,
    +                                                                 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.TableSubQueryNode proto = protoNode.getTableSubQuery();
    +
    +    TableSubQueryNode tableSubQuery = new 
TableSubQueryNode(protoNode.getPid());
    +    tableSubQuery.init(proto.getTableName(), 
nodeMap.get(proto.getChildId()));
    +    tableSubQuery.setInSchema(convertSchema(protoNode.getInSchema()));
    +    if (proto.getTargetsCount() > 0) {
    +      tableSubQuery.setTargets(convertTargets(context, 
proto.getTargetsList()));
    +    }
    +
    +    return tableSubQuery;
    +  }
    +
    +  private static CreateTableNode convertCreateTable(Map<Integer, 
LogicalNode> nodeMap,
    +                                            PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.PersistentStoreNode persistentStoreProto = 
protoNode.getPersistentStore();
    +    PlanProto.StoreTableNodeSpec storeTableNodeSpec = 
protoNode.getStoreTable();
    +    PlanProto.CreateTableNodeSpec createTableNodeSpec = 
protoNode.getCreateTable();
    +
    +    CreateTableNode createTable = new CreateTableNode(protoNode.getPid());
    +    if (protoNode.hasInSchema()) {
    +      createTable.setInSchema(convertSchema(protoNode.getInSchema()));
    +    }
    +    if (protoNode.hasOutSchema()) {
    +      createTable.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    }
    +    createTable.setChild(nodeMap.get(persistentStoreProto.getChildId()));
    +    createTable.setStorageType(persistentStoreProto.getStorageType());
    +    createTable.setOptions(new 
KeyValueSet(persistentStoreProto.getTableProperties()));
    +
    +    createTable.setTableName(storeTableNodeSpec.getTableName());
    +    if (storeTableNodeSpec.hasPartitionMethod()) {
    +      createTable.setPartitionMethod(new 
PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
    +    }
    +
    +    
createTable.setTableSchema(convertSchema(createTableNodeSpec.getSchema()));
    +    createTable.setExternal(createTableNodeSpec.getExternal());
    +    if (createTableNodeSpec.getExternal() && 
createTableNodeSpec.hasPath()) {
    +      createTable.setPath(new Path(createTableNodeSpec.getPath()));
    +    }
    +    createTable.setIfNotExists(createTableNodeSpec.getIfNotExists());
    +
    +    return createTable;
    +  }
    +
    +  private static InsertNode convertInsert(Map<Integer, LogicalNode> 
nodeMap,
    +                                                   PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.PersistentStoreNode persistentStoreProto = 
protoNode.getPersistentStore();
    +    PlanProto.StoreTableNodeSpec storeTableNodeSpec = 
protoNode.getStoreTable();
    +    PlanProto.InsertNodeSpec insertNodeSpec = protoNode.getInsert();
    +
    +    InsertNode insertNode = new InsertNode(protoNode.getPid());
    +    if (protoNode.hasInSchema()) {
    +      insertNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    }
    +    if (protoNode.hasOutSchema()) {
    +      insertNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    }
    +    insertNode.setChild(nodeMap.get(persistentStoreProto.getChildId()));
    +    insertNode.setStorageType(persistentStoreProto.getStorageType());
    +    insertNode.setOptions(new 
KeyValueSet(persistentStoreProto.getTableProperties()));
    +
    +    if (storeTableNodeSpec.hasTableName()) {
    +      insertNode.setTableName(storeTableNodeSpec.getTableName());
    +    }
    +    if (storeTableNodeSpec.hasPartitionMethod()) {
    +      insertNode.setPartitionMethod(new 
PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
    +    }
    +
    +    insertNode.setOverwrite(insertNodeSpec.getOverwrite());
    +    
insertNode.setTableSchema(convertSchema(insertNodeSpec.getTableSchema()));
    +    if (insertNodeSpec.hasTargetSchema()) {
    +      
insertNode.setTargetSchema(convertSchema(insertNodeSpec.getTargetSchema()));
    +    }
    +    if (insertNodeSpec.hasProjectedSchema()) {
    +      
insertNode.setProjectedSchema(convertSchema(insertNodeSpec.getProjectedSchema()));
    +    }
    +    if (insertNodeSpec.hasPath()) {
    +      insertNode.setPath(new Path(insertNodeSpec.getPath()));
    +    }
    +
    +    return insertNode;
    +  }
    +
    +  private static DropTableNode convertDropTable(PlanProto.LogicalNode 
protoNode) {
    +    DropTableNode dropTable = new DropTableNode(protoNode.getPid());
    +
    +    PlanProto.DropTableNode dropTableProto = protoNode.getDropTable();
    +    dropTable.init(dropTableProto.getTableName(), 
dropTableProto.getIfExists(), dropTableProto.getPurge());
    +
    +    return dropTable;
    +  }
    +
    +  private static CreateDatabaseNode 
convertCreateDatabase(PlanProto.LogicalNode protoNode) {
    +    CreateDatabaseNode createDatabase = new 
CreateDatabaseNode(protoNode.getPid());
    +
    +    PlanProto.CreateDatabaseNode createDatabaseProto = 
protoNode.getCreateDatabase();
    +    createDatabase.init(createDatabaseProto.getDbName(), 
createDatabaseProto.getIfNotExists());
    +
    +    return createDatabase;
    +  }
    +
    +  private static DropDatabaseNode 
convertDropDatabase(PlanProto.LogicalNode protoNode) {
    +    DropDatabaseNode dropDatabase = new 
DropDatabaseNode(protoNode.getPid());
    +
    +    PlanProto.DropDatabaseNode dropDatabaseProto = 
protoNode.getDropDatabase();
    +    dropDatabase.init(dropDatabaseProto.getDbName(), 
dropDatabaseProto.getIfExists());
    +
    +    return dropDatabase;
    +  }
    +
    +  private static AlterTablespaceNode 
convertAlterTablespace(PlanProto.LogicalNode protoNode) {
    +    AlterTablespaceNode alterTablespace = new 
AlterTablespaceNode(protoNode.getPid());
    +
    +    PlanProto.AlterTablespaceNode alterTablespaceProto = 
protoNode.getAlterTablespace();
    +    
alterTablespace.setTablespaceName(alterTablespaceProto.getTableSpaceName());
    +
    +    switch (alterTablespaceProto.getSetType()) {
    +    case LOCATION:
    +      
alterTablespace.setLocation(alterTablespaceProto.getSetLocation().getLocation());
    +      break;
    +    default:
    +      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " 
+ alterTablespaceProto.getSetType().name());
    +    }
    +
    +    return alterTablespace;
    +  }
    +
    +  private static AlterTableNode convertAlterTable(PlanProto.LogicalNode 
protoNode) {
    +    AlterTableNode alterTable = new AlterTableNode(protoNode.getPid());
    +
    +    PlanProto.AlterTableNode alterTableProto = protoNode.getAlterTable();
    +    alterTable.setTableName(alterTableProto.getTableName());
    +
    +    switch (alterTableProto.getSetType()) {
    +    case RENAME_TABLE:
    +      
alterTable.setNewTableName(alterTableProto.getRenameTable().getNewName());
    +      break;
    +    case ADD_COLUMN:
    +      alterTable.setAddNewColumn(new 
Column(alterTableProto.getAddColumn().getAddColumn()));
    +      break;
    +    case RENAME_COLUMN:
    +      
alterTable.setColumnName(alterTableProto.getRenameColumn().getOldName());
    +      
alterTable.setNewColumnName(alterTableProto.getRenameColumn().getNewName());
    +      break;
    +    default:
    +      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " 
+ alterTableProto.getSetType().name());
    +    }
    +
    +    return alterTable;
    +  }
    +
    +  private static TruncateTableNode 
convertTruncateTable(PlanProto.LogicalNode protoNode) {
    +    TruncateTableNode truncateTable = new 
TruncateTableNode(protoNode.getPid());
    +
    +    PlanProto.TruncateTableNode truncateTableProto = 
protoNode.getTruncateTableNode();
    +    truncateTable.setTableNames(truncateTableProto.getTableNamesList());
    +
    +    return truncateTable;
    +  }
    +
    +  private static AggregationFunctionCallEval [] 
convertAggFuncCallEvals(OverridableConf context,
    +                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
    +    AggregationFunctionCallEval [] aggFuncs = new 
AggregationFunctionCallEval[evalTrees.size()];
    +    for (int i = 0; i < aggFuncs.length; i++) {
    +      aggFuncs[i] = (AggregationFunctionCallEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
    +    }
    +    return aggFuncs;
    +  }
    +
    +  private static WindowFunctionEval[] 
convertWindowFunccEvals(OverridableConf context,
    +                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
    +    WindowFunctionEval[] winFuncEvals = new 
WindowFunctionEval[evalTrees.size()];
    +    for (int i = 0; i < winFuncEvals.length; i++) {
    +      winFuncEvals[i] = (WindowFunctionEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
    +    }
    +    return winFuncEvals;
    +  }
    +
    +  public static Schema convertSchema(CatalogProtos.SchemaProto proto) {
    +    return new Schema(proto);
    +  }
    +
    +  public static Column[] convertColumns(List<CatalogProtos.ColumnProto> 
columnProtos) {
    +    Column [] columns = new Column[columnProtos.size()];
    +    for (int i = 0; i < columns.length; i++) {
    +      columns[i] = new Column(columnProtos.get(i));
    +    }
    +    return columns;
    +  }
    +
    +  public static Target[] convertTargets(OverridableConf context, 
List<PlanProto.Target> targetsProto) {
    +    Target [] targets = new Target[targetsProto.size()];
    +    for (int i = 0; i < targets.length; i++) {
    +      PlanProto.Target targetProto = targetsProto.get(i);
    +      EvalNode evalNode = EvalNodeDeserializer.deserialize(context, 
targetProto.getExpr());
    +      if (targetProto.hasAlias()) {
    +        targets[i] = new Target(evalNode, targetProto.getAlias());
    +      } else {
    +        targets[i] = new Target((FieldEval) evalNode);
    +      }
    +    }
    +    return targets;
    +  }
    +
    +  public static SortSpec[] 
convertSortSpecs(List<CatalogProtos.SortSpecProto> sortSpecProtos) {
    +    SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()];
    +    int i = 0;
    +    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
    +      sortSpecs[i++] = new SortSpec(proto);
    +    }
    +    return sortSpecs;
    +  }
    +
    +  public static JoinType convertJoinType(PlanProto.JoinType type) {
    +    switch (type) {
    +    case CROSS_JOIN:
    --- End diff --
    
    Got it.


> Protocol buffer De/Serialization for LogicalNode
> ------------------------------------------------
>
>                 Key: TAJO-269
>                 URL: https://issues.apache.org/jira/browse/TAJO-269
>             Project: Tajo
>          Issue Type: Improvement
>          Components: query master, worker
>            Reporter: Jihoon Son
>            Assignee: Hyunsik Choi
>             Fix For: 0.10
>
>         Attachments: TAJO-269.patch, TAJO-269_2.patch, TAJO-269_3.patch
>
>
> In the current implementation, the logical plan is serialized into a JSON 
> object and sent to each worker.
> However, the transmission of JSON object incurs the high overhead due to its 
> large size.
> ProtocolBuffer is a good alternative because its overhead is quite small and 
> already used in other modules of Tajo.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to