[ 
https://issues.apache.org/jira/browse/DRILL-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022415#comment-16022415
 ] 

ASF GitHub Bot commented on DRILL-5485:
---------------------------------------

Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/829#discussion_r118160088
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
 ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.server.rest;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.DrillBuf;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelPromise;
    +import io.netty.channel.DefaultChannelPromise;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
    +import org.apache.drill.exec.proto.GeneralRPCProtos;
    +import org.apache.drill.exec.record.RecordBatchLoader;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.rpc.AbstractUserClientConnectionWrapper;
    +import org.apache.drill.exec.rpc.Acks;
    +import org.apache.drill.exec.rpc.ChannelClosedException;
    +import org.apache.drill.exec.rpc.ConnectionThrottle;
    +import org.apache.drill.exec.rpc.RpcOutcomeListener;
    +import org.apache.drill.exec.rpc.user.UserSession;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import java.net.SocketAddress;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +public class WebUserConnection extends AbstractUserClientConnectionWrapper 
implements ConnectionThrottle {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(WebUserConnection.class);
    +
    +  /**
    +   * WebUserConnectionWrapper which represents the UserClientConnection 
for the WebUser submitting the query. It provides
    +   * access to the UserSession executing the query. There is no actual 
physical channel corresponding to this connection
    +   * wrapper.
    +   */
    +
    +  protected BufferAllocator allocator;
    +
    +  protected SocketAddress remoteAddress;
    +
    +  protected UserSession webUserSession;
    +
    +  protected final ChannelPromise closeFuture;
    +
    +  public final List<Map<String, String>> results = Lists.newArrayList();
    +
    +  public final Set<String> columns = Sets.newLinkedHashSet();
    +
    +  WebUserConnection(BufferAllocator allocator, UserSession webUserSession,
    +                    SocketAddress remoteAddress, ChannelPromise 
closeFuture) {
    +    this.allocator = allocator;
    +    this.webUserSession = webUserSession;
    +    this.remoteAddress = remoteAddress;
    +    this.closeFuture = closeFuture;
    +  }
    +
    +  @Override
    +  public UserSession getSession() {
    +    return webUserSession;
    +  }
    +
    +  @Override
    +  public void sendData(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, 
QueryWritableBatch result) {
    +
    +    // Check if there is any data or not. There can be overflow here but 
DrillBuf doesn't support allocating with
    +    // bytes in long. Hence we are just preserving the earlier behavior 
and logging debug log for the case.
    +    final int dataByteCount = (int) result.getByteCount();
    +
    +    if (dataByteCount <= 0) {
    +      if (logger.isDebugEnabled()) {
    +        logger.debug("Either no data received for this query or there is 
BufferOverflow in dataByteCount: {}",
    +            dataByteCount);
    +      }
    +      latch.countDown();
    +      listener.success(Acks.OK, null);
    +      return;
    +    }
    +
    +    // If here that means there is some data for sure. Create a ByteBuf 
with all the data in it.
    +    final int rows = result.getHeader().getRowCount();
    +    final DrillBuf bufferWithData = allocator.buffer(dataByteCount);
    +    try {
    +      final ByteBuf[] resultDataBuffers = result.getBuffers();
    +
    +      for (final ByteBuf buffer : resultDataBuffers) {
    +        bufferWithData.writeBytes(buffer);
    +        buffer.release();
    +      }
    +
    +      final RecordBatchLoader loader = new RecordBatchLoader(allocator);
    +      try {
    +        loader.load(result.getHeader().getDef(), bufferWithData);
    +        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
    +        // SchemaChangeException, so check/clean catch clause below.
    +        for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
    +          columns.add(loader.getSchema().getColumn(i).getPath());
    +        }
    +        for (int i = 0; i < rows; ++i) {
    +          final Map<String, String> record = Maps.newHashMap();
    +          for (VectorWrapper<?> vw : loader) {
    +            final String field = 
vw.getValueVector().getMetadata().getNamePart().getName();
    +            final ValueVector.Accessor accessor = 
vw.getValueVector().getAccessor();
    +            final Object value = i < accessor.getValueCount() ? 
accessor.getObject(i) : null;
    +            final String display = value == null ? null : value.toString();
    +            record.put(field, display);
    +          }
    +          results.add(record);
    +        }
    +      } finally {
    +        loader.clear();
    +      }
    +    } catch (Exception e) { // not catching OOM here
    +      exception = UserException.systemError(e).build(logger);
    +      latch.countDown();
    +    } finally {
    +      bufferWithData.release();
    +    }
    +
    +    // Notify the listener with ACK.OK because data is send successfully 
from Drillbit even though there is error found
    +    // during processing.
    +    listener.success(Acks.OK, null);
    +  }
    +
    +  @Override
    +  public ChannelFuture getChannelClosureFuture() {
    +    return closeFuture;
    +  }
    +
    +  @Override
    +  public SocketAddress getRemoteAddress() {
    +    return remoteAddress;
    +  }
    +
    +  @Override
    +  public void setAutoRead(boolean enableAutoRead) {
    +    // no-op
    +  }
    +
    +  /**
    +   * For anonymous WebUserSession it is called after each query request 
whereas for Authenticated WebUserSession
    +   * it is never called.
    --- End diff --
    
    Fixed.


> Remove WebServer dependency on DrillClient
> ------------------------------------------
>
>                 Key: DRILL-5485
>                 URL: https://issues.apache.org/jira/browse/DRILL-5485
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Web Server
>            Reporter: Sorabh Hamirwasia
>             Fix For: 1.11.0
>
>
> With encryption support using SASL, client's won't be able to authenticate 
> using PLAIN mechanism when encryption is enabled on the cluster. Today 
> WebServer which is embedded inside Drillbit creates a DrillClient instance 
> for each WebClient session. And the WebUser is authenticated as part of 
> authentication between DrillClient instance and Drillbit using PLAIN 
> mechanism. But with encryption enabled this will fail since encryption 
> doesn't support authentication using PLAN mechanism, hence no WebClient can 
> connect to a Drillbit. There are below issues as well with this approach:
> 1) Since DrillClient is used per WebUser session this is expensive as it has 
> heavyweight RPC layer for DrillClient and all it's dependencies. 
> 2) If the Foreman for a WebUser is also selected to be a different node then 
> there will be extra hop of transferring data back to WebClient.
> To resolve all the above issue it would be better to authenticate the WebUser 
> locally using the Drillbit on which WebServer is running without creating 
> DrillClient instance. We can use the local PAMAuthenticator to authenticate 
> the user. After authentication is successful the local Drillbit can also 
> serve as the Foreman for all the queries submitted by WebUser. This can be 
> achieved by submitting the query to the local Drillbit Foreman work queue. 
> This will also remove the requirement to encrypt the channel opened between 
> WebServer (DrillClient) and selected Drillbit since with this approach there 
> won't be any physical channel opened between them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to