[ 
https://issues.apache.org/jira/browse/DRILL-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667214#comment-16667214
 ] 

ASF GitHub Bot commented on DRILL-6792:
---------------------------------------

weijietong commented on a change in pull request #1504: DRILL-6792: Find the 
right probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r228927761
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##########
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
       return runningFragments.get(handle);
     }
 
+    /**
+     * receive the RuntimeFilter thorough the wire
+     * @param runtimeFilter
+     */
     public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
       BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
       boolean toForeman = runtimeFilterDef.getToForeman();
       QueryId queryId = runtimeFilterDef.getQueryId();
       String queryIdStr = QueryIdHelper.getQueryId(queryId);
+      runtimeFilter.retainBuffers(1);
 
 Review comment:
   I have tried to transfer the buffers, but it doesn't work. The runtime 
filter was sent by the router ,but not gain a OK ack, and not found any error 
information reported. The only changed code is below:
   ```
         String queryIdStr = QueryIdHelper.getQueryId(queryId);
         //to foreman
         if (toForeman) {
           Foreman foreman = queries.get(queryId);
           BufferAllocator bufferAllocator = 
foreman.getQueryContext().getAllocator();
           RuntimeFilterWritable transferred = 
runtimeFilter.newRuntimeFilterWritable(bufferAllocator);
           if (foreman != null) {
             executor.execute(new Runnable() {
               @Override
               public void run() {
                 final Thread currentThread = Thread.currentThread();
                 final String originalName = currentThread.getName();
                 currentThread.setName(queryIdStr + 
":foreman:routeRuntimeFilter");
                 try {
                   //foreman.getRuntimeFilterRouter().register(runtimeFilter);
                   foreman.getRuntimeFilterRouter().register(transferred);
                 } catch (Exception e) {
                   logger.warn("Exception while registering the RuntimeFilter", 
e);
                 } finally {
                   currentThread.setName(originalName);
                   //runtimeFilter.close();
                   transferred.close();
                 }
               }
             });
           }
         } else {
   ```
   Could you give any help ?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Find the right probe side fragment to any storage plugin
> --------------------------------------------------------
>
>                 Key: DRILL-6792
>                 URL: https://issues.apache.org/jira/browse/DRILL-6792
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>             Fix For: 1.15.0
>
>
> The current implementation of JPPD to find the probe side wrapper depends on 
> the GroupScan's digest. But there's no promise the GroupScan's digest will 
> not be changed since it is attached to the RuntimeFilterDef by different 
> storage plugin implementation logic.So here we assign a unique identifier to 
> the RuntimeFilter operator, and find the right probe side fragment wrapper by 
> the runtime filter identifier at the RuntimeFilterRouter class. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to