This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f19890c73276cb252a93b8ef28f7b177856236d9
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Tue Jun 2 13:30:11 2020 +0800

    KYLIN-4305 Streaming Receiver cannot limit income query
     request or cancel long-running query
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +-
 .../stream/rpc/HttpStreamDataSearchClient.java     |  11 ++-
 .../kylin/stream/core/model/DataRequest.java       |   9 ++
 .../core/query/MultiThreadsResultCollector.java    | 107 +++++++++++++++------
 .../core/query/StreamingCubeDataSearcher.java      |  17 ++--
 .../stream/core/query/StreamingSearchContext.java  |  10 ++
 .../server/rest/controller/DataController.java     |   4 +-
 7 files changed, 119 insertions(+), 46 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3429963..0e52818 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -57,6 +57,7 @@ import java.util.regex.Pattern;
 public abstract class KylinConfigBase implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger logger = 
LoggerFactory.getLogger(KylinConfigBase.class);
+    private static final int AVAILABLE_PROCESSORS = 
Runtime.getRuntime().availableProcessors();
 
     private static final String FALSE = "false";
     private static final String TRUE = "true";
@@ -2478,11 +2479,13 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public int getStreamingReceiverQueryCoreThreads() {
-        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
+        int def = getStreamingReceiverQueryMaxThreads() - 1;
+        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + 
""));
     }
 
     public int getStreamingReceiverQueryMaxThreads() {
-        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
+        int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
+        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + 
""));
     }
 
     public int getStreamingReceiverUseThreadsPerQuery() {
diff --git 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index 3ae5e29..d1c0bd5 100644
--- 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -74,6 +74,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
  */
 public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
     public static final Logger logger = 
LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
+    public static final long WAIT_DURATION = 2 * 60000 ;
 
     private static ExecutorService executorService;
     static {
@@ -97,7 +98,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
             final TupleFilter tupleFilter, final Set<TblColRef> dimensions, 
final Set<TblColRef> groups,
             final Set<FunctionDesc> metrics, final int storagePushDownLimit, 
final boolean allowStorageAggregation) {
         List<ReplicaSet> replicaSetsOfCube = 
assignmentsCache.getReplicaSetsByCube(cube.getName());
-        int timeout = 120 * 1000; // timeout should be configurable
+        int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
         final QueuedStreamingTupleIterator result = new 
QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
         final QueryContext query = QueryContextFacade.current();
 
@@ -167,7 +168,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
             return foundReceiver;
         }
 
-        if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // 
retry every 2 minutes
+        if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // 
retry every 2 minutes
             return foundReceiver;
         }
 
@@ -177,16 +178,16 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
     public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance 
cube, StreamingTupleConverter tupleConverter,
             RecordsSerializer recordsSerializer, Node receiver, TupleInfo 
tupleInfo) throws Exception {
         String queryId = dataRequest.getQueryId();
-        logger.info("send query to receiver " + receiver + " with query id:" + 
queryId);
         String url = "http://"; + receiver.getHost() + ":" + receiver.getPort() 
+ "/kylin/api/data/query";
 
         try {
+            int connTimeout = 
cube.getConfig().getStreamingRPCHttpConnTimeout();
+            int readTimeout = 
cube.getConfig().getStreamingRPCHttpReadTimeout();
+            dataRequest.setDeadline(System.currentTimeMillis() + 
(int)(readTimeout * 1.5));
             String content = JsonUtil.writeValueAsString(dataRequest);
             Stopwatch sw;
             sw = Stopwatch.createUnstarted();
             sw.start();
-            int connTimeout = 
cube.getConfig().getStreamingRPCHttpConnTimeout();
-            int readTimeout = 
cube.getConfig().getStreamingRPCHttpReadTimeout();
             String msg = restService.postRequest(url, content, connTimeout, 
readTimeout);
 
             logger.info("query-{}: receive response from {} take time:{}", 
queryId, receiver, sw.elapsed(MILLISECONDS));
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index 07c9028..f32b751 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -36,6 +36,7 @@ public class DataRequest {
     private boolean allowStorageAggregation;
 
     private long requestSendTime;
+    private long deadline;
     private boolean enableDetailProfile;
     private String storageBehavior;
 
@@ -142,4 +143,12 @@ public class DataRequest {
     public void setHavingFilter(String havingFilter) {
         this.havingFilter = havingFilter;
     }
+
+    public long getDeadline() {
+        return deadline;
+    }
+
+    public void setDeadline(long deadline) {
+        this.deadline = deadline;
+    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 25cc4e8..0ca08e4 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -14,18 +14,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.stream.core.query;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.KylinConfig;
@@ -38,28 +39,40 @@ import 
org.apache.kylin.shaded.com.google.common.collect.Lists;
 
 public class MultiThreadsResultCollector extends ResultCollector {
     private static Logger logger = 
LoggerFactory.getLogger(MultiThreadsResultCollector.class);
-    private static ExecutorService executor;
+    private static ThreadPoolExecutor scannerThreadPool;
+    private static int MAX_RUNNING_THREAD_COUNT;
+
     static {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        executor = new 
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
-                config.getStreamingReceiverQueryMaxThreads(), 60L, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("query-worker"));
+        MAX_RUNNING_THREAD_COUNT = 
config.getStreamingReceiverQueryMaxThreads();
+        scannerThreadPool = new 
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+                MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(), new 
NamedThreadFactory("query-worker"));
     }
 
-    private int timeout;
+    private long deadline;
+    private String queryId;
+
+    /**
+     * if current query exceeded the deadline
+     */
+    private AtomicBoolean cancelFlag = new AtomicBoolean(false);
     private Semaphore workersSemaphore;
-    final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000);
     private AtomicInteger notCompletedWorkers;
 
-    public MultiThreadsResultCollector(int numOfWorkers, int timeout) {
+    final private BlockingQueue<Record> recordCachePool = new 
LinkedBlockingQueue<>(10000);
+
+    public MultiThreadsResultCollector(int numOfWorkers, long deadline) {
         this.workersSemaphore = new Semaphore(numOfWorkers);
-        this.timeout = timeout;
+        this.deadline = deadline;
+        this.queryId = StreamingQueryProfile.get().getQueryId();
     }
 
     @Override
     public Iterator<Record> iterator() {
         notCompletedWorkers = new AtomicInteger(searchResults.size());
-        executor.submit(new WorkSubmitter());
+        Thread masterThread = new Thread(new WorkSubmitter(), 
"MultiThreadsResultCollector_" + queryId);
+        masterThread.start();
 
         final int batchSize = 100;
         final long startTime = System.currentTimeMillis();
@@ -69,38 +82,41 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
 
             @Override
             public boolean hasNext() {
-                boolean exits = (internalIT.hasNext() || queue.size() > 0);
+                boolean exits = (internalIT.hasNext() || 
!recordCachePool.isEmpty());
                 if (!exits) {
                     while (notCompletedWorkers.get() > 0) {
                         Thread.yield();
-                        long takeTime = System.currentTimeMillis() - startTime;
-                        if (takeTime > timeout) {
+                        if (System.currentTimeMillis() > deadline) {
+                            masterThread.interrupt(); // notify main thread
+                            cancelFlag.set(true);
+                            logger.warn("Beyond the deadline for {}.", 
queryId);
                             throw new RuntimeException("Timeout when iterate 
search result");
                         }
-                        if (internalIT.hasNext() || queue.size() > 0) {
+                        if (internalIT.hasNext() || 
!recordCachePool.isEmpty()) {
                             return true;
                         }
                     }
                 }
-
                 return exits;
             }
 
             @Override
             public Record next() {
                 try {
-                    long takeTime = System.currentTimeMillis() - startTime;
-                    if (takeTime > timeout) {
+                    if (System.currentTimeMillis() > deadline) {
                         throw new RuntimeException("Timeout when iterate 
search result");
                     }
                     if (!internalIT.hasNext()) {
                         recordList.clear();
-                        Record one = queue.poll(timeout - takeTime, 
TimeUnit.MILLISECONDS);
+                        Record one = recordCachePool.poll(deadline - 
startTime, TimeUnit.MILLISECONDS);
                         if (one == null) {
+                            masterThread.interrupt(); // notify main thread
+                            cancelFlag.set(true);
+                            logger.debug("Exceeded the deadline for {}.", 
queryId);
                             throw new RuntimeException("Timeout when iterate 
search result");
                         }
                         recordList.add(one);
-                        queue.drainTo(recordList, batchSize - 1);
+                        recordCachePool.drainTo(recordList, batchSize - 1);
                         internalIT = recordList.iterator();
                     }
                     return internalIT.next();
@@ -128,15 +144,13 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
             try {
                 result.startRead();
                 for (Record record : result) {
-                    try {
-                        queue.put(record.copy());
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException("Timeout when visiting 
streaming segmenent", e);
-                    }
+                    recordCachePool.put(record.copy());
                 }
                 result.endRead();
+            } catch (InterruptedException inter) {
+                logger.debug("Cancelled scan streaming segment", inter);
             } catch (Exception e) {
-                logger.error("error when iterate search result", e);
+                logger.warn("Error when iterate search result", e);
             } finally {
                 notCompletedWorkers.decrementAndGet();
                 workersSemaphore.release();
@@ -147,15 +161,44 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
     private class WorkSubmitter implements Runnable {
         @Override
         public void run() {
-            for (final IStreamingSearchResult result : searchResults) {
-                executor.submit(new ResultIterateWorker(result));
-                try {
-                    workersSemaphore.acquire();
-                } catch (InterruptedException e) {
-                    logger.error("interrupted", e);
+            List<Future> futureList = 
Lists.newArrayListWithExpectedSize(searchResults.size());
+            int cancelTimes = 0;
+            try {
+                for (final IStreamingSearchResult result : searchResults) {
+                    Future f = scannerThreadPool.submit(new 
ResultIterateWorker(result));
+                    futureList.add(f);
+                    workersSemaphore.acquire(); // Throw InterruptedException 
when interrupted
+                }
+                while (notCompletedWorkers.get() > 0) {
+                    Thread.sleep(100);
+                    if (cancelFlag.get() || 
Thread.currentThread().isInterrupted()) {
+                        break;
+                    }
+                }
+            } catch (InterruptedException inter) {
+                logger.warn("Interrupted", inter);
+            } finally {
+                for (Future f : futureList) {
+                    if (!f.isCancelled() || !f.isDone()) {
+                        if (f.cancel(true)) {
+                            cancelTimes++;
+                        }
+                    }
                 }
             }
+            logger.debug("Finish MultiThreadsResultCollector for queryId {}, 
cancel {}. Current thread pool: {}.",
+                    queryId, cancelTimes, scannerThreadPool);
         }
     }
 
+    /**
+     * block query if return true
+     */
+    public static boolean isFullUp() {
+        boolean occupied = scannerThreadPool.getActiveCount() >= 
MAX_RUNNING_THREAD_COUNT;
+        if (occupied) {
+            logger.debug("ThreadPool {}", scannerThreadPool);
+        }
+        return occupied;
+    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
index f89ddec..42972ad 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
@@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory;
 
 public class StreamingCubeDataSearcher {
     private static Logger logger = 
LoggerFactory.getLogger(StreamingCubeDataSearcher.class);
-
-    private static int TIMEOUT = Integer.MAX_VALUE;
-
     private StreamingSegmentManager streamingSegmentManager;
     private String cubeName;
     private CubeDesc cubeDesc;
@@ -73,7 +70,15 @@ public class StreamingCubeDataSearcher {
         try {
             logger.info("query-{}: use cuboid {} to serve the query", 
queryProfile.getQueryId(),
                     searchRequest.getHitCuboid());
-            ResultCollector resultCollector = getResultCollector();
+            ResultCollector resultCollector = 
getResultCollector(searchRequest);
+            if (resultCollector instanceof MultiThreadsResultCollector) {
+                while (MultiThreadsResultCollector.isFullUp() && 
System.currentTimeMillis() < searchRequest.getDeadline()) {
+                    Thread.sleep(50);
+                }
+                if (System.currentTimeMillis() >= searchRequest.getDeadline()) 
{
+                    throw new RuntimeException("Timeout for " + 
queryProfile.getQueryId());
+                }
+            }
             Collection<StreamingCubeSegment> segments = 
streamingSegmentManager.getAllSegments();
             StreamingDataQueryPlanner scanRangePlanner = 
searchRequest.getQueryPlanner();
             for (StreamingCubeSegment queryableSegment : segments) {
@@ -105,10 +110,10 @@ public class StreamingCubeDataSearcher {
         }
     }
 
-    private ResultCollector getResultCollector() {
+    private ResultCollector getResultCollector(StreamingSearchContext 
searchRequest) {
         int useThreads = 
cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery();
         if (useThreads > 1) {
-            return new MultiThreadsResultCollector(useThreads, TIMEOUT);
+            return new MultiThreadsResultCollector(useThreads, 
searchRequest.getDeadline());
         } else {
             return new SingleThreadResultCollector();
         }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
index a7d3f84..4c92470 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
@@ -45,6 +45,8 @@ public class StreamingSearchContext {
     private long hitCuboid;
     private long basicCuboid;
 
+    private long deadline = Long.MAX_VALUE;
+
     public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> 
dimensions, Set<TblColRef> groups,
                                   Set<FunctionDesc> metrics, TupleFilter 
filter, TupleFilter havingFilter) {
         this.cubeDesc = cubeDesc;
@@ -158,4 +160,12 @@ public class StreamingSearchContext {
         sortedSet.addAll(cubeDesc.getMandatoryCuboids());
         return sortedSet;
     }
+
+    long getDeadline() {
+        return deadline;
+    }
+
+    public void setDeadline(long deadline) {
+        this.deadline =  deadline;
+    }
 }
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index c21afe5..8d6bda6 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -81,7 +82,7 @@ public class DataController extends BasicController {
         }
         StreamingQueryProfile.set(queryProfile);
         logger.info("receive query request queryId:{}", queryId);
-        try {
+        try (SetThreadName changeName = new SetThreadName("Query %s", 
queryId)) {
             final Stopwatch sw = Stopwatch.createUnstarted();
             sw.start();
             String cubeName = dataRequest.getCubeName();
@@ -107,6 +108,7 @@ public class DataController extends BasicController {
 
             StreamingSearchContext gtSearchRequest = new 
StreamingSearchContext(cubeDesc, dimensions, groups,
                     metrics, tupleFilter, havingFilter);
+            gtSearchRequest.setDeadline(dataRequest.getDeadline());
             searchResult = dataSearcher.doSearch(gtSearchRequest, 
minSegmentTime,
                     dataRequest.isAllowStorageAggregation());
 

Reply via email to