This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b6de8d  NIFI-7025: Wrap Hive 3 calls with UGI.doAs Updated 
PutHive3Streaming to wrap calls to Hive in UGI.doAs methods Fixed misleading 
logging message after the principal has been authenticated with the KDC When 
connecting to unsecured Hive 3, a UGI with "simple" auth will be used
4b6de8d is described below

commit 4b6de8d164a2fe52d03fe06e751e2ece4ce7c680
Author: jstorck <jtsw...@gmail.com>
AuthorDate: Tue Mar 3 20:03:59 2020 -0500

    NIFI-7025: Wrap Hive 3 calls with UGI.doAs
    Updated PutHive3Streaming to wrap calls to Hive in UGI.doAs methods
    Fixed misleading logging message after the principal has been authenticated 
with the KDC
    When connecting to unsecured Hive 3, a UGI with "simple" auth will be used
    
    Signed-off-by: Matthew Burgess <mattyb...@apache.org>
    
    This closes #4108
---
 .../nifi/processors/hive/PutHive3Streaming.java    | 317 +++++++++++----------
 .../processors/hive/TestPutHive3Streaming.java     |  12 +-
 2 files changed, 184 insertions(+), 145 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index a1123d2..23b873f 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -67,8 +67,10 @@ import org.apache.nifi.util.hive.HiveOptions;
 import org.apache.nifi.util.hive.HiveUtils;
 import org.apache.nifi.util.hive.ValidationResources;
 
+import javax.security.auth.login.LoginException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -321,7 +323,7 @@ public class PutHive3Streaming extends AbstractProcessor {
     }
 
     @OnScheduled
-    public void setup(final ProcessContext context) {
+    public void setup(final ProcessContext context) throws IOException {
         ComponentLog log = getLogger();
         rollbackOnFailure = 
context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
 
@@ -368,9 +370,9 @@ public class PutHive3Streaming extends AbstractProcessor {
                 throw new ProcessException(ae);
             }
 
-            log.info("Successfully logged in as principal {} with keytab {}", 
new Object[]{resolvedPrincipal, resolvedKeytab});
+            log.info("Successfully logged in as principal " + 
resolvedPrincipal);
         } else {
-            ugi = null;
+            ugi = SecurityUtil.loginSimple(hiveConfig);
             kerberosUserReference.set(null);
         }
 
@@ -381,172 +383,181 @@ public class PutHive3Streaming extends 
AbstractProcessor {
     }
 
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final String dbName = 
context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
-        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
-        final ComponentLog log = getLogger();
-        String metastoreURIs = null;
-        if (context.getProperty(METASTORE_URI).isSet()) {
-            metastoreURIs = 
context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
-            if (StringUtils.isEmpty(metastoreURIs)) {
-                // Shouldn't be empty at this point, log an error, penalize 
the flow file, and return
-                log.error("The '" + METASTORE_URI.getDisplayName() + "' 
property evaluated to null or empty, penalizing flow file, routing to failure");
-                session.transfer(session.penalize(flowFile), REL_FAILURE);
+        getUgi().doAs((PrivilegedAction<Void>) () -> {
+            FlowFile flowFile = session.get();
+            if (flowFile == null) {
+                return null;
             }
-        }
-
-        final String staticPartitionValuesString = 
context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
-        final boolean disableStreamingOptimizations = 
context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
-
-        // Override the Hive Metastore URIs in the config if set by the user
-        if (metastoreURIs != null) {
-           hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), 
metastoreURIs);
-        }
 
-        HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
-                .withHiveConf(hiveConfig)
-                .withCallTimeout(callTimeout)
-                .withStreamingOptimizations(!disableStreamingOptimizations);
+            final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            final String dbName = 
context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+            final ComponentLog log = getLogger();
+            String metastoreURIs = null;
+            if (context.getProperty(METASTORE_URI).isSet()) {
+                metastoreURIs = 
context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
+                if (StringUtils.isEmpty(metastoreURIs)) {
+                    // Shouldn't be empty at this point, log an error, 
penalize the flow file, and return
+                    log.error("The '" + METASTORE_URI.getDisplayName() + "' 
property evaluated to null or empty, penalizing flow file, routing to failure");
+                    session.transfer(session.penalize(flowFile), REL_FAILURE);
+                }
+            }
 
-        if (!StringUtils.isEmpty(staticPartitionValuesString)) {
-            List<String> staticPartitionValues = 
Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
-            o = o.withStaticPartitionValues(staticPartitionValues);
-        }
+            final String staticPartitionValuesString = 
context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
+            final boolean disableStreamingOptimizations = 
context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
 
-        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
-            final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-            o = 
o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
-        }
+            // Override the Hive Metastore URIs in the config if set by the 
user
+            if (metastoreURIs != null) {
+               
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
+            }
 
-        final HiveOptions options = o;
+            HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
+                    .withHiveConf(hiveConfig)
+                    .withCallTimeout(callTimeout)
+                    
.withStreamingOptimizations(!disableStreamingOptimizations);
 
-        // Store the original class loader, then explicitly set it to this 
class's classloader (for use by the Hive Metastore)
-        ClassLoader originalClassloader = 
Thread.currentThread().getContextClassLoader();
-        
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            if (!StringUtils.isEmpty(staticPartitionValuesString)) {
+                List<String> staticPartitionValues = 
Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
+                o = o.withStaticPartitionValues(staticPartitionValues);
+            }
 
-        StreamingConnection hiveStreamingConnection = null;
+            if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+                final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+                final String explicitPrincipal = 
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+                final String resolvedPrincipal;
+                if (credentialsService != null) {
+                    resolvedPrincipal = credentialsService.getPrincipal();
+                    o = o.withKerberosKeytab(credentialsService.getKeytab());
+                } else resolvedPrincipal = explicitPrincipal;
+                o = o.withKerberosPrincipal(resolvedPrincipal);
+            }
 
-        try {
-            final RecordReader reader;
-
-            try(final InputStream in = session.read(flowFile)) {
-                // if we fail to create the RecordReader then we want to route 
to failure, so we need to
-                // handle this separately from the other IOExceptions which 
normally route to retry
-                try {
-                    reader = recordReaderFactory.createRecordReader(flowFile, 
in, getLogger());
-                } catch (Exception e) {
-                    throw new RecordReaderFactoryException("Unable to create 
RecordReader", e);
-                }
+            final HiveOptions options = o;
 
-                hiveStreamingConnection = makeStreamingConnection(options, 
reader);
+            // Store the original class loader, then explicitly set it to this 
class's classloader (for use by the Hive Metastore)
+            ClassLoader originalClassloader = 
Thread.currentThread().getContextClassLoader();
+            
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
 
-                // Write records to Hive streaming, then commit and close
-                hiveStreamingConnection.beginTransaction();
-                hiveStreamingConnection.write(in);
-                hiveStreamingConnection.commitTransaction();
-                in.close();
+            StreamingConnection hiveStreamingConnection = null;
 
-                Map<String, String> updateAttributes = new HashMap<>();
-                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 
Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
-                updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
-                flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
-                session.getProvenanceReporter().send(flowFile, 
hiveStreamingConnection.getMetastoreUri());
-            } catch (TransactionError te) {
-                if (rollbackOnFailure) {
-                    throw new ProcessException(te.getLocalizedMessage(), te);
-                } else {
-                    throw new ShouldRetryException(te.getLocalizedMessage(), 
te);
+            try {
+                final RecordReader reader;
+
+                try(final InputStream in = session.read(flowFile)) {
+                    // if we fail to create the RecordReader then we want to 
route to failure, so we need to
+                    // handle this separately from the other IOExceptions 
which normally route to retry
+                    try {
+                        reader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
+                    } catch (Exception e) {
+                        throw new RecordReaderFactoryException("Unable to 
create RecordReader", e);
+                    }
+
+                    hiveStreamingConnection = makeStreamingConnection(options, 
reader);
+
+                    // Write records to Hive streaming, then commit and close
+                    hiveStreamingConnection.beginTransaction();
+                    hiveStreamingConnection.write(in);
+                    hiveStreamingConnection.commitTransaction();
+                    in.close();
+
+                    Map<String, String> updateAttributes = new HashMap<>();
+                    updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 
Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
+                    updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
+                    flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
+                    session.getProvenanceReporter().send(flowFile, 
hiveStreamingConnection.getMetastoreUri());
+                } catch (TransactionError te) {
+                    if (rollbackOnFailure) {
+                        throw new ProcessException(te.getLocalizedMessage(), 
te);
+                    } else {
+                        throw new 
ShouldRetryException(te.getLocalizedMessage(), te);
+                    }
+                } catch (RecordReaderFactoryException rrfe) {
+                    if (rollbackOnFailure) {
+                        throw new ProcessException(rrfe);
+                    } else {
+                        log.error(
+                                "Failed to create {} for {} - routing to 
failure",
+                                new 
Object[]{RecordReader.class.getSimpleName(), flowFile},
+                                rrfe
+                        );
+                        session.transfer(flowFile, REL_FAILURE);
+                        return null;
+                    }
                 }
-            } catch (RecordReaderFactoryException rrfe) {
+                session.transfer(flowFile, REL_SUCCESS);
+            } catch (InvalidTable | SerializationError | StreamingIOFailure | 
IOException e) {
                 if (rollbackOnFailure) {
-                    throw new ProcessException(rrfe);
+                    if (hiveStreamingConnection != null) {
+                        abortConnection(hiveStreamingConnection);
+                    }
+                    throw new ProcessException(e.getLocalizedMessage(), e);
                 } else {
+                    Map<String, String> updateAttributes = new HashMap<>();
+                    final String recordCountAttribute = 
(hiveStreamingConnection != null) ? 
Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) 
: "0";
+                    updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 
recordCountAttribute);
+                    updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
+                    flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
                     log.error(
-                            "Failed to create {} for {} - routing to failure",
-                            new Object[]{RecordReader.class.getSimpleName(), 
flowFile},
-                            rrfe
+                            "Exception while processing {} - routing to 
failure",
+                            new Object[]{flowFile},
+                            e
                     );
                     session.transfer(flowFile, REL_FAILURE);
-                    return;
                 }
-            }
-            session.transfer(flowFile, REL_SUCCESS);
-        } catch (InvalidTable | SerializationError | StreamingIOFailure | 
IOException e) {
-            if (rollbackOnFailure) {
+            } catch (DiscontinuedException e) {
+                // The input FlowFile processing is discontinued. Keep it in 
the input queue.
+                getLogger().warn("Discontinued processing for {} due to {}", 
new Object[]{flowFile, e}, e);
+                session.transfer(flowFile, Relationship.SELF);
+            } catch (ConnectionError ce) {
+                // If we can't connect to the metastore, yield the processor
+                context.yield();
+                throw new ProcessException("A connection to metastore cannot 
be established", ce);
+            } catch (ShouldRetryException e) {
+                // This exception is already a result of adjusting an error, 
so simply transfer the FlowFile to retry. Still need to abort the txn
+                getLogger().error(e.getLocalizedMessage(), e);
                 if (hiveStreamingConnection != null) {
                     abortConnection(hiveStreamingConnection);
                 }
-                throw new ProcessException(e.getLocalizedMessage(), e);
-            } else {
-                Map<String, String> updateAttributes = new HashMap<>();
-                final String recordCountAttribute = (hiveStreamingConnection 
!= null) ? 
Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) 
: "0";
-                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 
recordCountAttribute);
-                updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
-                flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
-                log.error(
-                        "Exception while processing {} - routing to failure",
-                        new Object[]{flowFile},
-                        e
-                );
-                session.transfer(flowFile, REL_FAILURE);
-            }
-        } catch (DiscontinuedException e) {
-            // The input FlowFile processing is discontinued. Keep it in the 
input queue.
-            getLogger().warn("Discontinued processing for {} due to {}", new 
Object[]{flowFile, e}, e);
-            session.transfer(flowFile, Relationship.SELF);
-        } catch (ConnectionError ce) {
-            // If we can't connect to the metastore, yield the processor
-            context.yield();
-            throw new ProcessException("A connection to metastore cannot be 
established", ce);
-        } catch (ShouldRetryException e) {
-            // This exception is already a result of adjusting an error, so 
simply transfer the FlowFile to retry. Still need to abort the txn
-            getLogger().error(e.getLocalizedMessage(), e);
-            if (hiveStreamingConnection != null) {
-                abortConnection(hiveStreamingConnection);
-            }
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_RETRY);
-        } catch (StreamingException se) {
-            // Handle all other exceptions. These are often record-based 
exceptions (since Hive will throw a subclass of the exception caught above)
-            Throwable cause = se.getCause();
-            if (cause == null) cause = se;
-            // This is a failure on the incoming data, rollback on failure if 
specified; otherwise route to failure after penalizing (and abort txn in any 
case)
-            if (rollbackOnFailure) {
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_RETRY);
+            } catch (StreamingException se) {
+                // Handle all other exceptions. These are often record-based 
exceptions (since Hive will throw a subclass of the exception caught above)
+                Throwable cause = se.getCause();
+                if (cause == null) cause = se;
+                // This is a failure on the incoming data, rollback on failure 
if specified; otherwise route to failure after penalizing (and abort txn in any 
case)
+                if (rollbackOnFailure) {
+                    if (hiveStreamingConnection != null) {
+                        abortConnection(hiveStreamingConnection);
+                    }
+                    throw new ProcessException(cause.getLocalizedMessage(), 
cause);
+                } else {
+                    flowFile = session.penalize(flowFile);
+                    Map<String, String> updateAttributes = new HashMap<>();
+                    final String recordCountAttribute = 
(hiveStreamingConnection != null) ? 
Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) 
: "0";
+                    updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 
recordCountAttribute);
+                    updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
+                    flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
+                    log.error(
+                            "Exception while trying to stream {} to hive - 
routing to failure",
+                            new Object[]{flowFile},
+                            se
+                    );
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+
+            } catch (Throwable t) {
                 if (hiveStreamingConnection != null) {
                     abortConnection(hiveStreamingConnection);
                 }
-                throw new ProcessException(cause.getLocalizedMessage(), cause);
-            } else {
-                flowFile = session.penalize(flowFile);
-                Map<String, String> updateAttributes = new HashMap<>();
-                final String recordCountAttribute = (hiveStreamingConnection 
!= null) ? 
Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) 
: "0";
-                updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 
recordCountAttribute);
-                updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
-                flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
-                log.error(
-                        "Exception while trying to stream {} to hive - routing 
to failure",
-                        new Object[]{flowFile},
-                        se
-                );
-                session.transfer(flowFile, REL_FAILURE);
-            }
-
-        } catch (Throwable t) {
-            if (hiveStreamingConnection != null) {
-                abortConnection(hiveStreamingConnection);
+                throw (t instanceof ProcessException) ? (ProcessException) t : 
new ProcessException(t);
+            } finally {
+                closeConnection(hiveStreamingConnection);
+                // Restore original class loader, might not be necessary but 
is good practice since the processor task changed it
+                
Thread.currentThread().setContextClassLoader(originalClassloader);
             }
-            throw (t instanceof ProcessException) ? (ProcessException) t : new 
ProcessException(t);
-        } finally {
-            closeConnection(hiveStreamingConnection);
-            // Restore original class loader, might not be necessary but is 
good practice since the processor task changed it
-            Thread.currentThread().setContextClassLoader(originalClassloader);
-        }
+            return null;
+        });
     }
 
     StreamingConnection makeStreamingConnection(HiveOptions options, 
RecordReader reader) throws StreamingException {
@@ -623,5 +634,23 @@ public class PutHive3Streaming extends AbstractProcessor {
             super(message, cause);
         }
     }
+
+    UserGroupInformation getUgi() {
+        getLogger().trace("getting UGI instance");
+        if (kerberosUserReference.get() != null) {
+            // if there's a KerberosUser associated with this UGI, check the 
TGT and relogin if it is close to expiring
+            KerberosUser kerberosUser = kerberosUserReference.get();
+            getLogger().debug("kerberosUser is " + kerberosUser);
+            try {
+                getLogger().debug("checking TGT on kerberosUser [{}]", new 
Object[] {kerberosUser});
+                kerberosUser.checkTGTAndRelogin();
+            } catch (LoginException e) {
+                throw new ProcessException("Unable to relogin with kerberos 
credentials for " + kerberosUser.getPrincipal(), e);
+            }
+        } else {
+            getLogger().debug("kerberosUser was null, will not refresh TGT 
with KerberosUser");
+        }
+        return ugi;
+    }
 }
 
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index d9113fe..edaefe8 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -155,7 +155,7 @@ public class TestPutHive3Streaming {
         System.setProperty("java.security.krb5.kdc", "nifi.kdc");
 
         ugi = null;
-        processor = new MockPutHive3Streaming();
+        processor = new MockPutHive3Streaming(ugi);
         hiveConfigurator = mock(HiveConfigurator.class);
         hiveConf = new HiveConf();
         
when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf);
@@ -272,6 +272,7 @@ public class TestPutHive3Streaming {
         runner.setProperty(PutHive3Streaming.DB_NAME, "default");
         runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
         processor.ugi = mock(UserGroupInformation.class);
+        processor.kerberosUserReference.set(mock(KerberosUser.class));
         runner.run();
         assertNull(processor.ugi);
         assertNull(processor.kerberosUserReference.get());
@@ -1128,6 +1129,10 @@ public class TestPutHive3Streaming {
                 new FieldSchema("scale", serdeConstants.DOUBLE_TYPE_NAME, "")
         );
 
+        private MockPutHive3Streaming(UserGroupInformation ugi) {
+            this.ugi = ugi;
+        }
+
         @Override
         StreamingConnection makeStreamingConnection(HiveOptions options, 
RecordReader reader) throws StreamingException {
 
@@ -1175,6 +1180,11 @@ public class TestPutHive3Streaming {
         public void setGeneratePermissionsFailure(boolean 
generatePermissionsFailure) {
             this.generatePermissionsFailure = generatePermissionsFailure;
         }
+
+        @Override
+        UserGroupInformation getUgi() {
+            return ugi;
+        }
     }
 
     private class MockHiveStreamingConnection implements StreamingConnection {

Reply via email to