[ 
https://issues.apache.org/jira/browse/ACCUMULO-3509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081796#comment-15081796
 ] 

ASF GitHub Bot commented on ACCUMULO-3509:
------------------------------------------

Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/62#discussion_r48780266
  
    --- Diff: 
test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
 ---
    @@ -0,0 +1,181 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.test.functional;
    +
    +import static java.nio.charset.StandardCharsets.UTF_8;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +
    +import org.apache.accumulo.core.client.BatchWriter;
    +import org.apache.accumulo.core.client.BatchWriterConfig;
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.accumulo.core.client.IteratorSetting;
    +import org.apache.accumulo.core.client.Scanner;
    +import org.apache.accumulo.core.client.admin.ActiveScan;
    +import org.apache.accumulo.core.client.admin.InstanceOperations;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.conf.Property;
    +import org.apache.accumulo.core.data.Key;
    +import org.apache.accumulo.core.data.Mutation;
    +import org.apache.accumulo.core.data.Range;
    +import org.apache.accumulo.core.data.Value;
    +import org.apache.accumulo.core.security.Authorizations;
    +import org.apache.accumulo.harness.AccumuloClusterIT;
    +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.Text;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Verify that we have resolved blocking issue by ensuring that we have 
not lost scan sessions which we know to currently be running
    + */
    +public class SessionBlockVerifyIT extends AccumuloClusterIT {
    +  private static final Logger log = 
LoggerFactory.getLogger(SessionBlockVerifyIT.class);
    +
    +  @Override
    +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, 
Configuration hadoopCoreSite) {
    +    Map<String,String> siteConfig = cfg.getSiteConfig();
    +    cfg.setNumTservers(1);
    +    siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "1s");
    +    siteConfig.put(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "11");
    +    cfg.setSiteConfig(siteConfig);
    +  }
    +
    +  @Override
    +  protected int defaultTimeoutSeconds() {
    +    return 60;
    +  }
    +
    +  private String sessionIdle = null;
    +
    +  @Before
    +  public void reduceSessionIdle() throws Exception {
    +
    +    InstanceOperations ops = getConnector().instanceOperations();
    +    sessionIdle = 
ops.getSystemConfiguration().get(Property.TSERV_SESSION_MAXIDLE.getKey());
    +    ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), "1s");
    +    log.info("Waiting for existing session idle time to expire");
    +    Thread.sleep(AccumuloConfiguration.getTimeInMillis(sessionIdle));
    +    log.info("Finished waiting");
    +  }
    +
    +  ExecutorService service = Executors.newFixedThreadPool(10);
    +
    +  @Test
    +  public void run() throws Exception {
    +    Connector c = getConnector();
    +    String tableName = getUniqueNames(1)[0];
    +    c.tableOperations().create(tableName);
    +
    +    BatchWriter bw = c.createBatchWriter(tableName, new 
BatchWriterConfig());
    +
    +    for (int i = 0; i < 1000; i++) {
    +      Mutation m = new Mutation(new Text(String.format("%08d", i)));
    +      for (int j = 0; j < 3; j++)
    +        m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + 
j).getBytes(UTF_8)));
    +
    +      bw.addMutation(m);
    +    }
    +
    +    bw.close();
    +
    +    Scanner scanner = c.createScanner(tableName, new Authorizations());
    +    scanner.setReadaheadThreshold(20000);
    +    scanner.setRange(new Range(String.format("%08d", 0), 
String.format("%08d", 1000)));
    +
    +    // test by making a slow iterator and then a couple of fast ones.
    +    // when then checking we shouldn't have any running except the slow 
iterator
    +    IteratorSetting setting = new IteratorSetting(21, SlowIterator.class);
    +    SlowIterator.setSeekSleepTime(setting, Long.MAX_VALUE);
    +    SlowIterator.setSleepTime(setting, Long.MAX_VALUE);
    +    scanner.addScanIterator(setting);
    +
    +    final Iterator<Entry<Key,Value>> slow = scanner.iterator();
    +
    +    final List<Future<Boolean>> callables = new 
ArrayList<Future<Boolean>>();
    +    for (int i = 0; i < 10; i++) {
    +      Future<Boolean> callable = service.submit(new Callable<Boolean>() {
    +        public Boolean call() {
    +          while (slow.hasNext()) {
    +
    +            slow.next();
    +          }
    +          return slow.hasNext();
    +        }
    +      });
    +      callables.add(callable);
    +    }
    +
    +    Thread.sleep(10000);
    +    log.info("Starting");
    +    // let's add more for good measure.
    +    for (int i = 0; i < 2; i++) {
    +      Scanner scanner2 = c.createScanner(tableName, new Authorizations());
    +
    +      scanner2.setRange(new Range(String.format("%08d", 0), 
String.format("%08d", 1000)));
    +
    +      scanner2.setBatchSize(1);
    +      Iterator<Entry<Key,Value>> iter = scanner2.iterator();
    +      sinkKeyValues(iter);
    +
    +    }
    +
    +    int sessionsFound = 0;
    +    // we have configured 1 tserver, so we can grab the one and only
    +    String tserver = 
c.instanceOperations().getTabletServers().iterator().next();
    +
    +    final List<ActiveScan> scans = 
c.instanceOperations().getActiveScans(tserver);
    +
    +    for (ActiveScan scan : scans) {
    +      // only here to minimize chance of seeing meta extent scans
    +
    +      if (tableName.equals(scan.getTable()) && scan.getSsiList().size() > 
0) {
    +        assertEquals("Not the expected iterator", 1, 
scan.getSsiList().size());
    +        assertTrue("Not the expected iterator", 
scan.getSsiList().iterator().next().contains("SlowIterator"));
    +        sessionsFound++;
    +      }
    +
    +    }
    +
    +    assertEquals("Must have ten sessions to ensure 3509 is resolved", 10, 
sessionsFound);
    +    for (Future<Boolean> callable : callables) {
    +      callable.cancel(true);
    +    }
    +    service.shutdown();
    +  }
    +
    +  private void sinkKeyValues(Iterator<Entry<Key,Value>> iter) throws 
Exception {
    +    while (iter.hasNext()) {
    --- End diff --
    
    Could you do a quick summation of the values you read to make sure you got 
all of the expected key-value pairs?


> Scanner lock cause Tablet lock, hence preventing idle scans from being swept, 
> hence blocking SimpleTimer thread 
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: ACCUMULO-3509
>                 URL: https://issues.apache.org/jira/browse/ACCUMULO-3509
>             Project: Accumulo
>          Issue Type: Bug
>          Components: tserver
>    Affects Versions: 1.6.0
>            Reporter: marco polo
>            Assignee: marco polo
>             Fix For: 1.6.5, 1.7.1, 1.8.0
>
>
> Synchronization with Tablet$Scanner via a read() will block close() being 
> called via the sweep method in TabletServer. As a result, the SimpleTimer 
> thread does not continue, and idle threads grow until the scan completes. 
> My patch, which is forthcoming, converts synchronized methods to use a fair 
> lock. If the lock is held by a read call, the close call will attempt to 
> obtain it, time out, and return indicating a close was not successful. The 
> sweep will continue, and the SimpleTimer thread will respawn later, 
> attempting closure on those Tablets at a later time. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to