Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/224#discussion_r104520129
  
    --- Diff: 
core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java ---
    @@ -0,0 +1,543 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.accumulo.core.summary;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.PrintStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +
    +import org.apache.accumulo.core.client.summary.Summarizer;
    +import org.apache.accumulo.core.client.summary.Summarizer.Collector;
    +import org.apache.accumulo.core.client.summary.Summarizer.Combiner;
    +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
    +import org.apache.accumulo.core.data.ByteSequence;
    +import org.apache.accumulo.core.data.Key;
    +import org.apache.accumulo.core.data.Value;
    +import org.apache.accumulo.core.summary.Gatherer.RowRange;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.WritableUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableMap;
    +
    +/**
    + * This class supports serializing summaries and periodically storing 
summaries. The implementations attempts to generate around 10 summaries that 
are evenly
    + * spaced. This allows asking for summaries for sub-ranges of data in a 
rfile.
    + *
    + * <p>
    + * At first summaries are created for every 1000 keys values. After 10 
summaries are added, the 10 summaries are merged to 5 and summaries are then 
created for
    + * every 2000 key values. The code keeps merging summaries and doubling 
the amount of key values per summary. This results in each summary covering 
about the
    + * same number of key values.
    + *
    + */
    +
    +class SummarySerializer {
    +
    +  private SummarizerConfiguration sconf;
    +  private LgSummaries[] allSummaries;
    +
    +  private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] 
allSummaries) {
    +    this.sconf = sconf;
    +    this.allSummaries = allSummaries;
    +  }
    +
    +  private SummarySerializer(SummarizerConfiguration sconf) {
    +    this.sconf = sconf;
    +    // this indicates max size was exceeded
    +    this.allSummaries = null;
    +  }
    +
    +  public SummarizerConfiguration getSummarizerConfiguration() {
    +    return sconf;
    +  }
    +
    +  public void print(String prefix, String indent, PrintStream out) {
    +
    +    if (allSummaries == null) {
    +      out.printf("%sSummary not stored because it was too large\n", prefix 
+ indent);
    +    } else {
    +      for (LgSummaries lgs : allSummaries) {
    +        lgs.print(prefix, indent, out);
    +      }
    +    }
    +  }
    +
    +  public Map<String,Long> getSummary(List<RowRange> ranges, 
SummarizerFactory sf) {
    +
    +    Summarizer kvs = sf.getSummarizer(sconf);
    +
    +    Map<String,Long> summary = new HashMap<>();
    +    for (LgSummaries lgs : allSummaries) {
    +      lgs.getSummary(ranges, kvs.combiner(sconf), summary);
    +    }
    +    return summary;
    +  }
    +
    +  public boolean exceedsRange(List<RowRange> ranges) {
    +    boolean er = false;
    +    for (LgSummaries lgs : allSummaries) {
    +      for (RowRange ke : ranges) {
    +        er |= lgs.exceedsRange(ke.getStartRow(), ke.getEndRow());
    +        if (er) {
    +          return er;
    +        }
    +      }
    +    }
    +
    +    return er;
    +  }
    +
    +  public boolean exceededMaxSize() {
    +    return allSummaries == null;
    +  }
    +
    +  private static class SummaryStoreImpl implements 
org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer {
    +
    +    HashMap<String,Long> summaries;
    +
    +    @Override
    +    public void accept(String summary, long value) {
    +      summaries.put(summary, value);
    +    }
    +  }
    +
    +  private static class LgBuilder {
    +    private Summarizer summarizer;
    +    private SummarizerConfiguration conf;
    +    private Collector collector;
    +
    +    private int maxSummaries = 10;
    +
    +    private int cutoff = 1000;
    +    private int count = 0;
    +
    +    private List<SummaryInfo> summaries = new ArrayList<>();
    +
    +    private Key lastKey;
    +
    +    private SummaryStoreImpl sci = new SummaryStoreImpl();
    +
    +    private String name;
    +
    +    private boolean sawFirst = false;
    +    private Text firstRow;
    +
    +    private boolean finished = false;
    +
    +    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs) {
    +      this.conf = conf;
    +      this.summarizer = kvs;
    +      this.name = "<DEFAULT>";
    +      this.collector = kvs.collector(conf);
    +    }
    +
    +    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs, String 
name) {
    +      this.conf = conf;
    +      this.summarizer = kvs;
    +      this.name = name;
    +      this.collector = kvs.collector(conf);
    +    }
    +
    +    public void put(Key k, Value v) {
    +      collector.accept(k, v);
    +      count++;
    +
    +      if (!sawFirst) {
    +        firstRow = k.getRow();
    +        sawFirst = true;
    +
    +      }
    +
    +      if (count >= cutoff) {
    +        sci.summaries = new HashMap<>();
    +        collector.summarize(sci);
    +        collector = summarizer.collector(conf);
    +        addSummary(k.getRow(), sci.summaries, count);
    +        count = 0;
    +      }
    +
    +      lastKey = k;
    +    }
    +
    +    private List<SummaryInfo> merge(int end) {
    +      List<SummaryInfo> mergedSummaries = new ArrayList<>();
    +      for (int i = 0; i < end; i += 2) {
    +        int mergedCount = summaries.get(i).count + summaries.get(i + 
1).count;
    +        summarizer.combiner(conf).merge(summaries.get(i).summary, 
summaries.get(i + 1).summary);
    +        mergedSummaries.add(new SummaryInfo(summaries.get(i + 
1).getLastRow(), summaries.get(i).summary, mergedCount));
    +      }
    +      return mergedSummaries;
    +    }
    +
    +    private void addSummary(Text row, Map<String,Long> summary, int count) 
{
    +      Preconditions.checkState(!finished);
    +      summaries.add(new SummaryInfo(row, summary, count));
    +
    +      if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) {
    +        summaries = merge(summaries.size());
    +        cutoff *= 2;
    +      }
    +    }
    +
    +    boolean collapse() {
    +      Preconditions.checkState(finished);
    +      if (summaries.size() <= 1) {
    +        return false;
    +      }
    +
    +      int end = summaries.size();
    +      if (end % 2 == 1) {
    +        end--;
    +      }
    +
    +      List<SummaryInfo> mergedSummaries = merge(end);
    +
    +      if (summaries.size() % 2 == 1) {
    +        mergedSummaries.add(summaries.get(summaries.size() - 1));
    +      }
    +
    +      summaries = mergedSummaries;
    +
    +      return true;
    +    }
    +
    +    void finish() {
    +      Preconditions.checkState(!finished);
    +      // summarize last data
    +      if (count > 0) {
    +        sci.summaries = new HashMap<>();
    +        collector.summarize(sci);
    +        collector = null;
    +        addSummary(lastKey.getRow(), sci.summaries, count);
    +        count = 0;
    +        finished = true;
    +      }
    +    }
    +
    +    public void save(DataOutputStream dos, HashMap<String,Integer> 
symbolTable) throws IOException {
    +      Preconditions.checkState(count == 0);
    +
    +      dos.writeUTF(name);
    +
    +      if (firstRow == null) {
    +        WritableUtils.writeVInt(dos, 0);
    +      } else {
    +        firstRow.write(dos);
    +      }
    +
    +      // write summaries
    +      WritableUtils.writeVInt(dos, summaries.size());
    +      for (SummaryInfo summaryInfo : summaries) {
    +        summaryInfo.getLastRow().write(dos);
    +        WritableUtils.writeVInt(dos, summaryInfo.count);
    +        saveSummary(dos, symbolTable, summaryInfo.summary);
    +      }
    +    }
    +
    +    private void saveSummary(DataOutputStream dos, HashMap<String,Integer> 
symbolTable, Map<String,Long> summary) throws IOException {
    +      WritableUtils.writeVInt(dos, summary.size());
    +      for (Entry<String,Long> e : summary.entrySet()) {
    +        WritableUtils.writeVInt(dos, symbolTable.get(e.getKey()));
    --- End diff --
    
    Would the `symbolTable` ever not have a mapping for the key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to