This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit f19890c73276cb252a93b8ef28f7b177856236d9 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Tue Jun 2 13:30:11 2020 +0800 KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query --- .../org/apache/kylin/common/KylinConfigBase.java | 7 +- .../stream/rpc/HttpStreamDataSearchClient.java | 11 ++- .../kylin/stream/core/model/DataRequest.java | 9 ++ .../core/query/MultiThreadsResultCollector.java | 107 +++++++++++++++------ .../core/query/StreamingCubeDataSearcher.java | 17 ++-- .../stream/core/query/StreamingSearchContext.java | 10 ++ .../server/rest/controller/DataController.java | 4 +- 7 files changed, 119 insertions(+), 46 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3429963..0e52818 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -57,6 +57,7 @@ import java.util.regex.Pattern; public abstract class KylinConfigBase implements Serializable { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class); + private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); private static final String FALSE = "false"; private static final String TRUE = "true"; @@ -2478,11 +2479,13 @@ public abstract class KylinConfigBase implements Serializable { } public int getStreamingReceiverQueryCoreThreads() { - return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50")); + int def = getStreamingReceiverQueryMaxThreads() - 1; + return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + "")); } public int getStreamingReceiverQueryMaxThreads() { - return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200")); + int def = Math.max(2, AVAILABLE_PROCESSORS - 1); + return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + "")); } public int getStreamingReceiverUseThreadsPerQuery() { diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java index 3ae5e29..d1c0bd5 100644 --- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java +++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java @@ -74,6 +74,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; */ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class); + public static final long WAIT_DURATION = 2 * 60000 ; private static ExecutorService executorService; static { @@ -97,7 +98,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { final TupleFilter tupleFilter, final Set<TblColRef> dimensions, final Set<TblColRef> groups, final Set<FunctionDesc> metrics, final int storagePushDownLimit, final boolean allowStorageAggregation) { List<ReplicaSet> replicaSetsOfCube = assignmentsCache.getReplicaSetsByCube(cube.getName()); - int timeout = 120 * 1000; // timeout should be configurable + int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2; final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout); final QueryContext query = QueryContextFacade.current(); @@ -167,7 +168,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { return foundReceiver; } - if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // retry every 2 minutes + if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // retry every 2 minutes return foundReceiver; } @@ -177,16 +178,16 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception { String queryId = dataRequest.getQueryId(); - logger.info("send query to receiver " + receiver + " with query id:" + queryId); String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query"; try { + int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout(); + int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout(); + dataRequest.setDeadline(System.currentTimeMillis() + (int)(readTimeout * 1.5)); String content = JsonUtil.writeValueAsString(dataRequest); Stopwatch sw; sw = Stopwatch.createUnstarted(); sw.start(); - int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout(); - int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout(); String msg = restService.postRequest(url, content, connTimeout, readTimeout); logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsed(MILLISECONDS)); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java index 07c9028..f32b751 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java @@ -36,6 +36,7 @@ public class DataRequest { private boolean allowStorageAggregation; private long requestSendTime; + private long deadline; private boolean enableDetailProfile; private String storageBehavior; @@ -142,4 +143,12 @@ public class DataRequest { public void setHavingFilter(String havingFilter) { this.havingFilter = havingFilter; } + + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java index 25cc4e8..0ca08e4 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java @@ -14,18 +14,19 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.stream.core.query; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.KylinConfig; @@ -38,28 +39,40 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists; public class MultiThreadsResultCollector extends ResultCollector { private static Logger logger = LoggerFactory.getLogger(MultiThreadsResultCollector.class); - private static ExecutorService executor; + private static ThreadPoolExecutor scannerThreadPool; + private static int MAX_RUNNING_THREAD_COUNT; + static { KylinConfig config = KylinConfig.getInstanceFromEnv(); - executor = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(), - config.getStreamingReceiverQueryMaxThreads(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("query-worker")); + MAX_RUNNING_THREAD_COUNT = config.getStreamingReceiverQueryMaxThreads(); + scannerThreadPool = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(), + MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new NamedThreadFactory("query-worker")); } - private int timeout; + private long deadline; + private String queryId; + + /** + * if current query exceeded the deadline + */ + private AtomicBoolean cancelFlag = new AtomicBoolean(false); private Semaphore workersSemaphore; - final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000); private AtomicInteger notCompletedWorkers; - public MultiThreadsResultCollector(int numOfWorkers, int timeout) { + final private BlockingQueue<Record> recordCachePool = new LinkedBlockingQueue<>(10000); + + public MultiThreadsResultCollector(int numOfWorkers, long deadline) { this.workersSemaphore = new Semaphore(numOfWorkers); - this.timeout = timeout; + this.deadline = deadline; + this.queryId = StreamingQueryProfile.get().getQueryId(); } @Override public Iterator<Record> iterator() { notCompletedWorkers = new AtomicInteger(searchResults.size()); - executor.submit(new WorkSubmitter()); + Thread masterThread = new Thread(new WorkSubmitter(), "MultiThreadsResultCollector_" + queryId); + masterThread.start(); final int batchSize = 100; final long startTime = System.currentTimeMillis(); @@ -69,38 +82,41 @@ public class MultiThreadsResultCollector extends ResultCollector { @Override public boolean hasNext() { - boolean exits = (internalIT.hasNext() || queue.size() > 0); + boolean exits = (internalIT.hasNext() || !recordCachePool.isEmpty()); if (!exits) { while (notCompletedWorkers.get() > 0) { Thread.yield(); - long takeTime = System.currentTimeMillis() - startTime; - if (takeTime > timeout) { + if (System.currentTimeMillis() > deadline) { + masterThread.interrupt(); // notify main thread + cancelFlag.set(true); + logger.warn("Beyond the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } - if (internalIT.hasNext() || queue.size() > 0) { + if (internalIT.hasNext() || !recordCachePool.isEmpty()) { return true; } } } - return exits; } @Override public Record next() { try { - long takeTime = System.currentTimeMillis() - startTime; - if (takeTime > timeout) { + if (System.currentTimeMillis() > deadline) { throw new RuntimeException("Timeout when iterate search result"); } if (!internalIT.hasNext()) { recordList.clear(); - Record one = queue.poll(timeout - takeTime, TimeUnit.MILLISECONDS); + Record one = recordCachePool.poll(deadline - startTime, TimeUnit.MILLISECONDS); if (one == null) { + masterThread.interrupt(); // notify main thread + cancelFlag.set(true); + logger.debug("Exceeded the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } recordList.add(one); - queue.drainTo(recordList, batchSize - 1); + recordCachePool.drainTo(recordList, batchSize - 1); internalIT = recordList.iterator(); } return internalIT.next(); @@ -128,15 +144,13 @@ public class MultiThreadsResultCollector extends ResultCollector { try { result.startRead(); for (Record record : result) { - try { - queue.put(record.copy()); - } catch (InterruptedException e) { - throw new RuntimeException("Timeout when visiting streaming segmenent", e); - } + recordCachePool.put(record.copy()); } result.endRead(); + } catch (InterruptedException inter) { + logger.debug("Cancelled scan streaming segment", inter); } catch (Exception e) { - logger.error("error when iterate search result", e); + logger.warn("Error when iterate search result", e); } finally { notCompletedWorkers.decrementAndGet(); workersSemaphore.release(); @@ -147,15 +161,44 @@ public class MultiThreadsResultCollector extends ResultCollector { private class WorkSubmitter implements Runnable { @Override public void run() { - for (final IStreamingSearchResult result : searchResults) { - executor.submit(new ResultIterateWorker(result)); - try { - workersSemaphore.acquire(); - } catch (InterruptedException e) { - logger.error("interrupted", e); + List<Future> futureList = Lists.newArrayListWithExpectedSize(searchResults.size()); + int cancelTimes = 0; + try { + for (final IStreamingSearchResult result : searchResults) { + Future f = scannerThreadPool.submit(new ResultIterateWorker(result)); + futureList.add(f); + workersSemaphore.acquire(); // Throw InterruptedException when interrupted + } + while (notCompletedWorkers.get() > 0) { + Thread.sleep(100); + if (cancelFlag.get() || Thread.currentThread().isInterrupted()) { + break; + } + } + } catch (InterruptedException inter) { + logger.warn("Interrupted", inter); + } finally { + for (Future f : futureList) { + if (!f.isCancelled() || !f.isDone()) { + if (f.cancel(true)) { + cancelTimes++; + } + } } } + logger.debug("Finish MultiThreadsResultCollector for queryId {}, cancel {}. Current thread pool: {}.", + queryId, cancelTimes, scannerThreadPool); } } + /** + * block query if return true + */ + public static boolean isFullUp() { + boolean occupied = scannerThreadPool.getActiveCount() >= MAX_RUNNING_THREAD_COUNT; + if (occupied) { + logger.debug("ThreadPool {}", scannerThreadPool); + } + return occupied; + } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java index f89ddec..42972ad 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java @@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory; public class StreamingCubeDataSearcher { private static Logger logger = LoggerFactory.getLogger(StreamingCubeDataSearcher.class); - - private static int TIMEOUT = Integer.MAX_VALUE; - private StreamingSegmentManager streamingSegmentManager; private String cubeName; private CubeDesc cubeDesc; @@ -73,7 +70,15 @@ public class StreamingCubeDataSearcher { try { logger.info("query-{}: use cuboid {} to serve the query", queryProfile.getQueryId(), searchRequest.getHitCuboid()); - ResultCollector resultCollector = getResultCollector(); + ResultCollector resultCollector = getResultCollector(searchRequest); + if (resultCollector instanceof MultiThreadsResultCollector) { + while (MultiThreadsResultCollector.isFullUp() && System.currentTimeMillis() < searchRequest.getDeadline()) { + Thread.sleep(50); + } + if (System.currentTimeMillis() >= searchRequest.getDeadline()) { + throw new RuntimeException("Timeout for " + queryProfile.getQueryId()); + } + } Collection<StreamingCubeSegment> segments = streamingSegmentManager.getAllSegments(); StreamingDataQueryPlanner scanRangePlanner = searchRequest.getQueryPlanner(); for (StreamingCubeSegment queryableSegment : segments) { @@ -105,10 +110,10 @@ public class StreamingCubeDataSearcher { } } - private ResultCollector getResultCollector() { + private ResultCollector getResultCollector(StreamingSearchContext searchRequest) { int useThreads = cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery(); if (useThreads > 1) { - return new MultiThreadsResultCollector(useThreads, TIMEOUT); + return new MultiThreadsResultCollector(useThreads, searchRequest.getDeadline()); } else { return new SingleThreadResultCollector(); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java index a7d3f84..4c92470 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java @@ -45,6 +45,8 @@ public class StreamingSearchContext { private long hitCuboid; private long basicCuboid; + private long deadline = Long.MAX_VALUE; + public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> dimensions, Set<TblColRef> groups, Set<FunctionDesc> metrics, TupleFilter filter, TupleFilter havingFilter) { this.cubeDesc = cubeDesc; @@ -158,4 +160,12 @@ public class StreamingSearchContext { sortedSet.addAll(cubeDesc.getMandatoryCuboids()); return sortedSet; } + + long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } } diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java index c21afe5..8d6bda6 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.commons.codec.binary.Base64; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.StorageSideBehavior; @@ -81,7 +82,7 @@ public class DataController extends BasicController { } StreamingQueryProfile.set(queryProfile); logger.info("receive query request queryId:{}", queryId); - try { + try (SetThreadName changeName = new SetThreadName("Query %s", queryId)) { final Stopwatch sw = Stopwatch.createUnstarted(); sw.start(); String cubeName = dataRequest.getCubeName(); @@ -107,6 +108,7 @@ public class DataController extends BasicController { StreamingSearchContext gtSearchRequest = new StreamingSearchContext(cubeDesc, dimensions, groups, metrics, tupleFilter, havingFilter); + gtSearchRequest.setDeadline(dataRequest.getDeadline()); searchResult = dataSearcher.doSearch(gtSearchRequest, minSegmentTime, dataRequest.isAllowStorageAggregation());