This is an automated email from the ASF dual-hosted git repository.

mgaido pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d8028  [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT][FOLLOWUP] 
Use ResultSet in catalog operations
d0d8028 is described below

commit d0d8028657c7314ad031f51c0a564a8786213187
Author: Marco Gaido <mga...@apache.org>
AuthorDate: Fri Aug 30 14:06:08 2019 +0200

    [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT][FOLLOWUP] Use ResultSet in 
catalog operations
    
    ## What changes were proposed in this pull request?
    
    This is a followup of #194 which addresses all the remaining concerns. The 
main changes are:
    
     - reverting the introduction of a state specific for catalog operations;
     - usage of `ResultSet` to send over the wire the data for catalog 
operations too.
    
    ## How was this patch tested?
    
    existing modified UTs
    
    Author: Marco Gaido <mga...@apache.org>
    
    Closes #217 from mgaido91/LIVY-622_followup.
---
 .../livy/thriftserver/LivyOperationManager.scala   | 18 ++---
 .../operation/GetColumnsOperation.scala            | 20 +++---
 .../operation/GetFunctionsOperation.scala          | 35 ++-------
 .../operation/GetSchemasOperation.scala            | 12 ++--
 .../operation/GetTablesOperation.scala             | 16 ++---
 .../thriftserver/operation/MetadataOperation.scala |  3 -
 .../operation/SparkCatalogOperation.scala          | 51 ++------------
 .../session/CleanupCatalogResultJob.java           | 37 ----------
 .../session/FetchCatalogResultJob.java             | 51 --------------
 .../livy/thriftserver/session/GetColumnsJob.java   | 31 ++++----
 .../livy/thriftserver/session/GetFunctionsJob.java | 66 ++++++++++++++---
 .../livy/thriftserver/session/GetSchemasJob.java   | 24 ++++---
 .../livy/thriftserver/session/GetTablesJob.java    | 19 ++---
 .../livy/thriftserver/session/SparkCatalogJob.java | 64 +++++++++++++++--
 .../livy/thriftserver/session/SparkUtils.java      | 46 ++++++++++++
 .../thriftserver/session/ThriftSessionState.java   | 32 ---------
 .../thriftserver/session/ThriftSessionTest.java    | 82 +++++++++++++---------
 17 files changed, 290 insertions(+), 317 deletions(-)

diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
index 2454185..eb1dd21 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
@@ -196,12 +196,7 @@ class LivyOperationManager(val livyThriftSessionManager: 
LivyThriftSessionManage
       tableTypes: util.List[String]): OperationHandle = {
     executeOperation(sessionHandle, {
       val op = new GetTablesOperation(
-        sessionHandle,
-        catalogName,
-        schemaName,
-        tableName,
-        tableTypes,
-        livyThriftSessionManager)
+        sessionHandle, schemaName, tableName, tableTypes, 
livyThriftSessionManager)
       addOperation(op, sessionHandle)
       op
     })
@@ -214,8 +209,8 @@ class LivyOperationManager(val livyThriftSessionManager: 
LivyThriftSessionManage
       schemaName: String,
       functionName: String): OperationHandle = {
     executeOperation(sessionHandle, {
-      val op = new GetFunctionsOperation(sessionHandle, catalogName, 
schemaName, functionName,
-        livyThriftSessionManager)
+      val op = new GetFunctionsOperation(
+        sessionHandle, schemaName, functionName, livyThriftSessionManager)
       addOperation(op, sessionHandle)
       op
     })
@@ -227,8 +222,7 @@ class LivyOperationManager(val livyThriftSessionManager: 
LivyThriftSessionManage
       catalogName: String,
       schemaName: String): OperationHandle = {
     executeOperation(sessionHandle, {
-      val op = new GetSchemasOperation(sessionHandle, catalogName, schemaName,
-        livyThriftSessionManager)
+      val op = new GetSchemasOperation(sessionHandle, schemaName, 
livyThriftSessionManager)
       addOperation(op, sessionHandle)
       op
     })
@@ -242,8 +236,8 @@ class LivyOperationManager(val livyThriftSessionManager: 
LivyThriftSessionManage
       tableName: String,
       columnName: String): OperationHandle = {
     executeOperation(sessionHandle, {
-      val op = new GetColumnsOperation(sessionHandle, catalogName, schemaName, 
tableName,
-        columnName, livyThriftSessionManager)
+      val op = new GetColumnsOperation(
+        sessionHandle, schemaName, tableName, columnName, 
livyThriftSessionManager)
       addOperation(op, sessionHandle)
       op
     })
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
index c9c106c..ae62679 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
@@ -26,10 +26,9 @@ import org.apache.livy.thriftserver.session.{GetColumnsJob, 
GetFunctionsJob}
 
 class GetColumnsOperation(
     sessionHandle: SessionHandle,
-    catalogName: String,
-    schemaName: String,
-    tableName: String,
-    columnName: String,
+    schemaPattern: String,
+    tablePattern: String,
+    columnPattern: String,
     sessionManager: LivyThriftSessionManager)
   extends SparkCatalogOperation(
     sessionHandle, OperationType.GET_COLUMNS, sessionManager) with Logging {
@@ -39,12 +38,12 @@ class GetColumnsOperation(
     setState(OperationState.RUNNING)
     try {
       rscClient.submit(new GetColumnsJob(
-        convertSchemaPattern(schemaName),
-        convertIdentifierPattern(tableName, datanucleusFormat = true),
-        Option(columnName).map { convertIdentifierPattern(_, datanucleusFormat 
= false) }.orNull,
+        schemaPattern,
+        tablePattern,
+        columnPattern,
         sessionId,
-        jobId
-      )).get()
+        jobId,
+        GetColumnsOperation.SCHEMA.fields.map(_.fieldType.dataType))).get()
 
       setState(OperationState.FINISHED)
     } catch {
@@ -97,6 +96,5 @@ object GetColumnsOperation {
       "user-generated Ref type, SQL type from java.sql.Types (null if 
DATA_TYPE isn't " +
       "DISTINCT or user-generated REF)"),
     Field("IS_AUTO_INCREMENT", BasicDataType("string"), "Indicates whether 
this column is " +
-      "auto incremented.")
-  )
+      "auto incremented."))
 }
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
index 0e43f16..4648f40 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
@@ -26,9 +26,8 @@ import org.apache.livy.thriftserver.LivyThriftSessionManager
 
 class GetFunctionsOperation(
     sessionHandle: SessionHandle,
-    catalogName: String,
-    schemaName: String,
-    functionName: String,
+    schemaPattern: String,
+    functionSQLSearch: String,
     sessionManager: LivyThriftSessionManager)
   extends SparkCatalogOperation(
     sessionHandle, OperationType.GET_FUNCTIONS, sessionManager) with Logging {
@@ -38,11 +37,11 @@ class GetFunctionsOperation(
     setState(OperationState.RUNNING)
     try {
       rscClient.submit(new GetFunctionsJob(
-        convertSchemaPattern(schemaName),
-        convertFunctionName(functionName),
+        schemaPattern,
+        functionSQLSearch,
         sessionId,
-        jobId
-      )).get()
+        jobId,
+        GetFunctionsOperation.SCHEMA.fields.map(_.fieldType.dataType))).get()
 
       setState(OperationState.FINISHED)
     } catch {
@@ -58,25 +57,6 @@ class GetFunctionsOperation(
     assertState(Seq(OperationState.FINISHED))
     GetFunctionsOperation.SCHEMA
   }
-
-  private def convertFunctionName(name: String): String = {
-    if (name == null) {
-      ".*"
-    } else {
-      var escape = false
-      name.flatMap {
-        case c if escape =>
-          if (c != '\\') escape = false
-          c.toString
-        case '\\' =>
-          escape = true
-          ""
-        case '%' => ".*"
-        case '_' => "."
-        case c => Character.toLowerCase(c).toString
-      }
-    }
-  }
 }
 
 object GetFunctionsOperation {
@@ -89,6 +69,5 @@ object GetFunctionsOperation {
     Field("FUNCTION_TYPE", BasicDataType("integer"),
       "Kind of function."),
     Field("SPECIFIC_NAME", BasicDataType("string"),
-      "The name which uniquely identifies this function within its schema")
-  )
+      "The name which uniquely identifies this function within its schema"))
 }
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
index 6bd0a17..95fb11a 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
@@ -26,8 +26,7 @@ import org.apache.livy.thriftserver.session.{GetSchemasJob, 
GetTablesJob}
 
 class GetSchemasOperation(
     sessionHandle: SessionHandle,
-    catalogName: String,
-    schemaName: String,
+    schemaPattern: String,
     sessionManager: LivyThriftSessionManager)
   extends SparkCatalogOperation(
     sessionHandle, OperationType.GET_SCHEMAS, sessionManager) with Logging {
@@ -37,8 +36,10 @@ class GetSchemasOperation(
     setState(OperationState.RUNNING)
     try {
       rscClient.submit(new GetSchemasJob(
-        convertSchemaPattern(schemaName), sessionId, jobId
-      )).get()
+        schemaPattern,
+        sessionId,
+        jobId,
+        GetSchemasOperation.SCHEMA.fields.map(_.fieldType.dataType))).get()
       setState(OperationState.FINISHED)
     } catch {
       case e: Throwable =>
@@ -58,6 +59,5 @@ class GetSchemasOperation(
 object GetSchemasOperation {
   val SCHEMA = Schema(
     Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
-    Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name.")
-  )
+    Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name."))
 }
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
index 4a939b3..3492724 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
@@ -26,9 +26,8 @@ import org.apache.livy.thriftserver.session.GetTablesJob
 
 class GetTablesOperation(
     sessionHandle: SessionHandle,
-    catalogName: String,
-    schemaName: String,
-    tableName: String,
+    schemaPattern: String,
+    tablePattern: String,
     tableTypes: java.util.List[String],
     sessionManager: LivyThriftSessionManager)
   extends SparkCatalogOperation(
@@ -39,12 +38,12 @@ class GetTablesOperation(
     setState(OperationState.RUNNING)
     try {
       rscClient.submit(new GetTablesJob(
-        convertSchemaPattern(schemaName),
-        convertIdentifierPattern(tableName, datanucleusFormat = true),
+        schemaPattern,
+        tablePattern,
         tableTypes,
         sessionId,
-        jobId
-      )).get()
+        jobId,
+        GetTablesOperation.SCHEMA.fields.map(_.fieldType.dataType))).get()
 
       setState(OperationState.FINISHED)
     } catch {
@@ -68,6 +67,5 @@ object GetTablesOperation {
     Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
     Field("TABLE_NAME", BasicDataType("string"), "Table name."),
     Field("TABLE_TYPE", BasicDataType("string"), "The table type, e.g. 
\"TABLE\", \"VIEW\", etc."),
-    Field("REMARKS", BasicDataType("string"), "Comments about the table.")
-  )
+    Field("REMARKS", BasicDataType("string"), "Comments about the table."))
 }
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
index 48c17cc..6f70f02 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
@@ -23,9 +23,6 @@ import org.apache.livy.thriftserver.serde.ThriftResultSet
 
 /**
   * MetadataOperation is the base class for operations which do not perform 
any call on Spark side
-  *
-  * @param sessionHandle
-  * @param opType
   */
 abstract class MetadataOperation(sessionHandle: SessionHandle, opType: 
OperationType)
   extends Operation(sessionHandle, opType) {
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
index 9ed31f7..2e2ecc5 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
@@ -17,12 +17,11 @@
 
 package org.apache.livy.thriftserver.operation
 
-import org.apache.commons.lang.StringUtils
 import org.apache.hive.service.cli._
 
 import org.apache.livy.thriftserver.LivyThriftSessionManager
 import org.apache.livy.thriftserver.serde.ThriftResultSet
-import org.apache.livy.thriftserver.session.{CleanupCatalogResultJob, 
FetchCatalogResultJob}
+import org.apache.livy.thriftserver.session.{CleanupStatementJob, 
FetchResultJob}
 
 /**
  * SparkCatalogOperation is the base class for operations which need to fetch 
catalog information
@@ -51,7 +50,7 @@ abstract class SparkCatalogOperation(
 
   @throws[HiveSQLException]
   override def close(): Unit = {
-    val cleaned = rscClient.submit(new CleanupCatalogResultJob(sessionId, 
jobId)).get()
+    val cleaned = rscClient.submit(new CleanupStatementJob(sessionId, 
jobId)).get()
     if (!cleaned) {
       warn(s"Fail to cleanup fetch catalog job (session = ${sessionId}), " +
         "this message can be ignored if the job failed.")
@@ -66,54 +65,12 @@ abstract class SparkCatalogOperation(
     throw new UnsupportedOperationException("SparkCatalogOperation.cancel()")
   }
 
-  /**
-   * Convert wildchars and escape sequence from JDBC format to 
datanucleous/regex
-   *
-   * This is ported from Spark Hive Thrift MetaOperation.
-   */
-  protected def convertIdentifierPattern(pattern: String, datanucleusFormat: 
Boolean): String = {
-    if (pattern == null) {
-      convertPattern("%", datanucleusFormat = true)
-    } else {
-      convertPattern(pattern, datanucleusFormat)
-    }
-  }
-
-  /**
-   * Convert wildchars and escape sequence of schema pattern from JDBC format 
to datanucleous/regex
-   * The schema pattern treats empty string also as wildchar.
-   *
-   * This is ported from Spark Hive Thrift MetaOperation.
-   */
-  protected def convertSchemaPattern(pattern: String): String = {
-    if (StringUtils.isEmpty(pattern)) {
-      convertPattern("%", datanucleusFormat = true)
-    } else {
-      convertPattern(pattern, datanucleusFormat = true)
-    }
-  }
-
-  private def convertPattern(pattern: String, datanucleusFormat: Boolean): 
String = {
-    val wStr = if (datanucleusFormat) "*" else ".*"
-    pattern
-      .replaceAll("([^\\\\])%", "$1" + wStr)
-      .replaceAll("\\\\%", "%")
-      .replaceAll("^%", wStr)
-      .replaceAll("([^\\\\])_", "$1.")
-      .replaceAll("\\\\_", "_")
-      .replaceAll("^_", ".")
-  }
-
   override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): 
ThriftResultSet = {
     validateFetchOrientation(orientation)
     assertState(Seq(OperationState.FINISHED))
     setHasResultSet(true)
     val maxRows = maxRowsL.toInt
-    val results = rscClient.submit(new FetchCatalogResultJob(sessionId, jobId, 
maxRows)).get()
-
-    val rowSet = ThriftResultSet.apply(getResultSetSchema, protocolVersion)
-    import scala.collection.JavaConverters._
-    results.asScala.foreach { r => rowSet.addRow(r.asInstanceOf[Array[Any]]) }
-    return rowSet
+    val results = rscClient.submit(new FetchResultJob(sessionId, jobId, 
maxRows)).get()
+    ThriftResultSet(results)
   }
 }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
deleted file mode 100644
index b9444ca..0000000
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.thriftserver.session;
-
-import org.apache.livy.Job;
-import org.apache.livy.JobContext;
-
-public class CleanupCatalogResultJob implements Job<Boolean> {
-  private final String sessionId;
-  private final String jobId;
-
-  public CleanupCatalogResultJob(String sessionId, String jobId) {
-    this.sessionId = sessionId;
-    this.jobId = jobId;
-  }
-
-  @Override
-  public Boolean call(JobContext jc) throws Exception {
-    ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
-      return session.cleanupCatalogJob(jobId);
-    }
-}
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
deleted file mode 100644
index 9654b02..0000000
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.thriftserver.session;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.livy.Job;
-import org.apache.livy.JobContext;
-
-public class FetchCatalogResultJob implements Job<List<Object[]>> {
-  private final String sessionId;
-  private final String jobId;
-  private final int maxRows;
-
-  public FetchCatalogResultJob(String sessionId, String jobId, int maxRows) {
-    this.sessionId = sessionId;
-    this.jobId = jobId;
-    this.maxRows = maxRows;
-  }
-
-  @Override
-  public List<Object[]> call(JobContext jc) throws Exception {
-    ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
-    Iterator<Object[]> iterator = session.findCatalogJob(jobId).iter;
-
-    List<Object[]> result = new ArrayList<>();
-    int n = 0;
-    while (iterator.hasNext() && n < maxRows) {
-      result.add(iterator.next());
-      n += 1;
-    }
-    return result;
-  }
-}
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
index bc2bd73..b5a0cba 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
@@ -22,9 +22,11 @@ import java.util.List;
 
 import static scala.collection.JavaConversions.seqAsJavaList;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.catalyst.catalog.CatalogTable;
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.types.StructField;
 
 public class GetColumnsJob extends SparkCatalogJob {
@@ -33,20 +35,21 @@ public class GetColumnsJob extends SparkCatalogJob {
   private final String columnPattern;
 
   public GetColumnsJob(
-    String databasePattern,
-    String tablePattern,
-    String columnPattern,
-    String sessionId,
-    String jobId) {
-      super(sessionId, jobId);
-      this.databasePattern = databasePattern;
-      this.tablePattern = tablePattern;
-      this.columnPattern = columnPattern;
+      String databasePattern,
+      String tablePattern,
+      String columnPattern,
+      String sessionId,
+      String jobId,
+      DataType[] resultTypes) {
+    super(sessionId, jobId, resultTypes);
+    this.databasePattern = convertSchemaPattern(databasePattern);
+    this.tablePattern = convertIdentifierPattern(tablePattern, true);
+    this.columnPattern = convertIdentifierPattern(columnPattern, false);
   }
 
   @Override
-  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
-    List<Object[]> columnList = new ArrayList<Object[]>();
+  protected List<Row> fetchCatalogObjects(SessionCatalog catalog) {
+    List<Row> columnList = new ArrayList<>();
     List<String> databases = 
seqAsJavaList(catalog.listDatabases(databasePattern));
 
     for (String db : databases) {
@@ -57,8 +60,8 @@ public class GetColumnsJob extends SparkCatalogJob {
         List<StructField> fields = seqAsJavaList(table.schema());
         int position = 0;
         for (StructField field : fields) {
-          if (columnPattern == null || field.name().matches(columnPattern)) {
-            columnList.add(new Object[] {
+          if (field.name().matches(columnPattern)) {
+            columnList.add(new GenericRow(new Object[] {
               DEFAULT_HIVE_CATALOG,
               table.database(),
               table.identifier().table(),
@@ -82,7 +85,7 @@ public class GetColumnsJob extends SparkCatalogJob {
               null, // SCOPE_TABLE
               null, // SOURCE_DATA_TYPE
               "NO" // IS_AUTO_INCREMENT
-            });
+            }));
             position += 1;
           }
         }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
index 297fb80..e5e383a 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
@@ -23,45 +23,89 @@ import java.util.List;
 import scala.Tuple2;
 import static scala.collection.JavaConversions.seqAsJavaList;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.FunctionIdentifier;
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
 
 public class GetFunctionsJob extends SparkCatalogJob {
+  private static final char SEARCH_STRING_ESCAPE = '\\';
   private final String databasePattern;
-  private final String functionName;
+  private final String functionRegex;
 
   public GetFunctionsJob(
       String databasePattern,
-      String functionName,
+      String functionSQLSearch,
       String sessionId,
-      String jobId) {
-    super(sessionId, jobId);
-    this.databasePattern = databasePattern;
-    this.functionName = functionName;
+      String jobId,
+      DataType[] resultTypes) {
+    super(sessionId, jobId, resultTypes);
+    // Spark is using regex to filter the function name, instead of SQL search 
patterns,
+    // so we need to convert them
+    this.databasePattern = convertSchemaPattern(databasePattern);
+    this.functionRegex = patternToRegex(functionSQLSearch);
   }
 
   @Override
-  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
-    List<Object[]> funcList = new ArrayList<Object[]>();
+  protected List<Row> fetchCatalogObjects(SessionCatalog catalog) {
+    List<Row> funcList = new ArrayList<>();
 
     List<String> databases = 
seqAsJavaList(catalog.listDatabases(databasePattern));
     for (String db : databases) {
       List<Tuple2<FunctionIdentifier, String>> identifiersTypes =
-        seqAsJavaList(catalog.listFunctions(db, functionName));
+        seqAsJavaList(catalog.listFunctions(db, functionRegex));
       for (Tuple2<FunctionIdentifier, String> identifierType : 
identifiersTypes) {
         FunctionIdentifier function = identifierType._1;
         ExpressionInfo info = catalog.lookupFunctionInfo(function);
-        funcList.add(new Object[] {
+        funcList.add(new GenericRow(new Object[] {
           null,
           function.database().isDefined() ? function.database().get() : null,
           function.funcName(),
           info.getUsage() + info.getExtended(),
           null,
           info.getClassName()
-        });
+        }));
       }
     }
     return funcList;
   }
+
+  /**
+   * Ported from Spark's CLIServiceUtils.
+   * Converts a SQL search pattern into an equivalent Java Regex.
+   *
+   * @param pattern input which may contain '%' or '_' wildcard characters, or
+   *                these characters escaped using { @code 
getSearchStringEscape()}.
+   * @return replace %/_ with regex search characters, also handle escaped 
characters.
+   */
+  private String patternToRegex(String pattern) {
+    if (pattern == null) {
+      return ".*";
+    } else {
+      StringBuilder result = new StringBuilder(pattern.length());
+
+      boolean escaped = false;
+      for (int i = 0, len = pattern.length(); i < len; i++) {
+        char c = pattern.charAt(i);
+        if (escaped) {
+          if (c != SEARCH_STRING_ESCAPE) {
+            escaped = false;
+          }
+          result.append(c);
+        } else {
+          if (c == SEARCH_STRING_ESCAPE) {
+            escaped = true;
+          } else if (c == '%') {
+            result.append(".*");
+          } else if (c == '_') {
+            result.append('.');
+          } else {
+            result.append(Character.toLowerCase(c));
+          }
+        }
+      }
+      return result.toString();
+    }
+  }
 }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
index 451bea2..59c4cca 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
@@ -22,25 +22,31 @@ import java.util.List;
 
 import static scala.collection.JavaConversions.seqAsJavaList;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
 
 public class GetSchemasJob extends SparkCatalogJob {
-  private final String schemaName;
+  private final String schemaPattern;
 
-  public GetSchemasJob(String schemaName, String sessionId, String jobId) {
-    super(sessionId, jobId);
-    this.schemaName = schemaName;
+  public GetSchemasJob(
+      String schemaPattern,
+      String sessionId,
+      String jobId,
+      DataType[] resultTypes) {
+    super(sessionId, jobId, resultTypes);
+    this.schemaPattern = convertSchemaPattern(schemaPattern);
   }
 
   @Override
-  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
-    List<String> databases = seqAsJavaList(catalog.listDatabases(schemaName));
-    List<Object[]> schemas = new ArrayList<>();
+  protected List<Row> fetchCatalogObjects(SessionCatalog catalog) {
+    List<String> databases = 
seqAsJavaList(catalog.listDatabases(schemaPattern));
+    List<Row> schemas = new ArrayList<>();
     for (String db : databases) {
-      schemas.add(new Object[] {
+      schemas.add(new GenericRow(new Object[] {
         db,
         DEFAULT_HIVE_CATALOG,
-      });
+      }));
     }
     return schemas;
   }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
index a071aef..d3c6b53 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
@@ -22,10 +22,12 @@ import java.util.List;
 
 import static scala.collection.JavaConversions.seqAsJavaList;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.catalyst.catalog.CatalogTable;
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
 
 public class GetTablesJob extends SparkCatalogJob {
   private final String databasePattern;
@@ -37,10 +39,11 @@ public class GetTablesJob extends SparkCatalogJob {
       String tablePattern,
       List<String> tableTypes,
       String sessionId,
-      String jobId) {
-    super(sessionId, jobId);
-    this.databasePattern = databasePattern;
-    this.tablePattern = tablePattern;
+      String jobId,
+      DataType[] resultTypes) {
+    super(sessionId, jobId, resultTypes);
+    this.databasePattern = convertSchemaPattern(databasePattern);
+    this.tablePattern = convertIdentifierPattern(tablePattern, true);
     if (tableTypes != null) {
       for (String type : tableTypes) {
         this.tableTypes.add(type.toUpperCase());
@@ -49,8 +52,8 @@ public class GetTablesJob extends SparkCatalogJob {
   }
 
   @Override
-  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
-    List<Object[]> tableList = new ArrayList<Object[]>();
+  protected List<Row> fetchCatalogObjects(SessionCatalog catalog) {
+    List<Row> tableList = new ArrayList<Row>();
     List<String> databases = 
seqAsJavaList(catalog.listDatabases(databasePattern));
     for (String db : databases) {
       List<TableIdentifier> tableIdentifiers =
@@ -59,14 +62,14 @@ public class GetTablesJob extends SparkCatalogJob {
         CatalogTable table = 
catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
         String type = convertTableType(table.tableType().name());
         if (tableTypes.isEmpty() || tableTypes.contains(type)) {
-          tableList.add(
+          tableList.add(new GenericRow(
             new Object[] {
               DEFAULT_HIVE_CATALOG,
               table.database(),
               table.identifier().table(),
               type,
               table.comment().isDefined() ? table.comment().get() : ""
-            });
+            }));
         }
       }
     }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
index e86721e..afbdf32 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
@@ -19,6 +19,7 @@ package org.apache.livy.thriftserver.session;
 
 import java.util.List;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
 
@@ -30,21 +31,76 @@ public abstract class SparkCatalogJob implements Job<Void> {
 
   private final String sessionId;
   private final String jobId;
+  private final DataType[] resultTypes;
 
-  public SparkCatalogJob(String sessionId, String jobId) {
+  public SparkCatalogJob(String sessionId, String jobId, DataType[] 
resultTypes) {
     this.sessionId = sessionId;
     this.jobId = jobId;
+    this.resultTypes = resultTypes;
   }
 
-  protected abstract List<Object[]> fetchCatalogObjects(SessionCatalog 
catalog);
+  protected abstract List<Row> fetchCatalogObjects(SessionCatalog catalog);
 
   @Override
   public Void call(JobContext jc) throws Exception {
     SessionCatalog catalog = 
((SparkSession)jc.sparkSession()).sessionState().catalog();
-    List<Object[]> objects = fetchCatalogObjects(catalog);
+    List<Row> objects = fetchCatalogObjects(catalog);
 
     ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
-    session.registerCatalogJob(jobId, objects.iterator());
+    session.registerStatement(
+        jobId, SparkUtils.dataTypesToSchema(resultTypes), objects.iterator());
     return null;
   }
+
+  /**
+   * Convert wildchars and escape sequence from JDBC format to 
datanucleous/regex.
+   *
+   * This is ported from Spark Hive Thrift MetaOperation.
+   */
+  protected String convertIdentifierPattern(final String pattern, boolean 
datanucleusFormat) {
+    if (pattern == null) {
+      return convertPattern("%", true);
+    } else {
+      return convertPattern(pattern, datanucleusFormat);
+    }
+  }
+
+  /**
+   * Convert wildchars and escape sequence of schema pattern from JDBC format 
to datanucleous/regex
+   * The schema pattern treats empty string also as wildchar.
+   *
+   * This is ported from Spark Hive Thrift MetaOperation.
+   */
+  protected String convertSchemaPattern(final String pattern) {
+    if ((pattern == null) || pattern.isEmpty()) {
+      return convertPattern("%", true);
+    } else {
+      return convertPattern(pattern, true);
+    }
+  }
+
+  /**
+   * Convert a pattern containing JDBC catalog search wildcards into
+   * Java regex patterns.
+   *
+   * @param pattern input which may contain '%' or '_' wildcard characters.
+   * @return replace %/_ with regex search characters, also handle escaped 
characters.
+   *
+   * The datanucleus module expects the wildchar as '*'. The columns search on 
the
+   * other hand is done locally inside the hive code and that requires the 
regex wildchar
+   * format '.*'  This is driven by the datanucleusFormat flag.
+   *
+   * This is ported from Spark Hive Thrift MetaOperation.
+   */
+  private String convertPattern(final String pattern, boolean 
datanucleusFormat) {
+    String wStr;
+    if (datanucleusFormat) {
+      wStr = "*";
+    } else {
+      wStr = ".*";
+    }
+    return pattern
+        .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", 
"%").replaceAll("^%", wStr)
+        .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", 
"_").replaceAll("^_", ".");
+  }
 }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
index fac79ad..a91cf35 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
@@ -60,6 +60,52 @@ final class SparkUtils {
   }
 
   /**
+   * Translates our Thrift data types to Spark types
+   */
+  public static StructType dataTypesToSchema(DataType[] types) {
+    StructField[] fields = new StructField[types.length];
+    int idx = 0;
+    for (DataType dt : types) {
+      org.apache.spark.sql.types.DataType sparkDt = null;
+      switch (dt) {
+        case BOOLEAN:
+          sparkDt = DataTypes.BooleanType;
+          break;
+        case BYTE:
+          sparkDt = DataTypes.ByteType;
+          break;
+        case SHORT:
+          sparkDt = DataTypes.ShortType;
+          break;
+        case INTEGER:
+          sparkDt = DataTypes.IntegerType;
+          break;
+        case LONG:
+          sparkDt = DataTypes.LongType;
+          break;
+        case FLOAT:
+          sparkDt = DataTypes.FloatType;
+          break;
+        case DOUBLE:
+          sparkDt = DataTypes.DoubleType;
+          break;
+        case BINARY:
+          sparkDt = DataTypes.BinaryType;
+          break;
+        case STRING:
+          sparkDt = DataTypes.StringType;
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid data type: " + dt);
+      }
+      fields[idx] = new StructField(
+              "col_" + idx, sparkDt, true, Metadata.empty());
+      idx++;
+    }
+    return new StructType(fields);
+  }
+
+  /**
    * This method is ported from Spark Hive Thrift server Type class
    * @param type
    * @return
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
index 9111d94..571c2e1 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
@@ -77,7 +77,6 @@ class ThriftSessionState {
     this.sessionId = sessionId;
     this.statements = new ConcurrentHashMap<>();
     this.spark = ctx.<SparkSession>sparkSession().newSession();
-    this.catalogJobStates = new ConcurrentHashMap<>();
   }
 
   SparkSession spark() {
@@ -126,35 +125,4 @@ class ThriftSessionState {
       throw new IllegalArgumentException(err);
     }
   }
-
-
-  private final ConcurrentMap<String, CatalogJobState> catalogJobStates;
-
-  void registerCatalogJob(String jobId, Iterator<Object[]> results) {
-    checkNotNull(jobId, "No catalog job ID.");
-    CatalogJobState state = new CatalogJobState(results);
-    if (catalogJobStates.putIfAbsent(jobId, state) != null) {
-      throw new IllegalStateException(
-          String.format("Catalog job %s already registered.", jobId));
-    }
-  }
-
-  CatalogJobState findCatalogJob(String jobId) {
-    checkNotNull(jobId, "No catalog job ID.");
-    CatalogJobState state = catalogJobStates.get(jobId);
-    if (state == null) {
-      throw catalogJobNotFound(jobId);
-    }
-    return state;
-  }
-
-  boolean cleanupCatalogJob(String jobId) {
-    checkNotNull(jobId, "No catalog job ID.");
-    return catalogJobStates.remove(jobId) != null;
-  }
-
-  private NoSuchElementException catalogJobNotFound(String jobId) {
-    return new NoSuchElementException(
-        String.format("Catalog job %s not found in session %s.", jobId, 
sessionId));
-  }
 }
diff --git 
a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
 
b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
index addc630..26c5c5a 100644
--- 
a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
+++ 
b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
@@ -132,49 +132,61 @@ public class ThriftSessionTest {
     String s3 = nextSession();
     waitFor(new RegisterSessionJob(s3));
     String getSchemaJobId = "test_get_schema_job";
-    waitFor(new GetSchemasJob("default", s3, getSchemaJobId));
-    List<Object[]> schemas = waitFor(
-        new FetchCatalogResultJob(s3, getSchemaJobId, Integer.MAX_VALUE));
-    assertEquals(1, schemas.size());
-    assertEquals("default", schemas.get(0)[0]);
-    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getSchemaJobId)));
+    DataType[] resultTypes = new DataType[] { DataType.STRING, DataType.STRING 
};
+    waitFor(new GetSchemasJob("default", s3, getSchemaJobId, resultTypes));
+    rs = waitFor(new FetchResultJob(s3, getSchemaJobId, Integer.MAX_VALUE));
+    cols = rs.getColumns();
+    assertEquals(1, cols[0].size());
+    assertEquals("default", cols[0].get(0));
+    assertTrue(waitFor(new CleanupStatementJob(s3, getSchemaJobId)));
 
     String getTablesJobId = "test_get_tables_job";
     List<String> testTableTypes = new ArrayList<>();
     testTableTypes.add("Table");
-    waitFor(new GetTablesJob("default", "*",
-        testTableTypes, s3, getTablesJobId));
-    List<Object[]> tables = waitFor(
-        new FetchCatalogResultJob(s3, getTablesJobId, Integer.MAX_VALUE));
-    assertEquals(1, tables.size());
-    assertEquals("default", tables.get(0)[1]);
-    assertEquals("test", tables.get(0)[2]);
-    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getTablesJobId)));
+    resultTypes = new DataType[] {
+        DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING, 
DataType.STRING };
+    waitFor(new GetTablesJob(
+        "default", "*", testTableTypes, s3, getTablesJobId, resultTypes));
+    rs = waitFor(new FetchResultJob(s3, getTablesJobId, Integer.MAX_VALUE));
+    cols = rs.getColumns();
+    assertEquals(1, cols[0].size());
+    assertEquals("default", cols[1].get(0));
+    assertEquals("test", cols[2].get(0));
+    assertTrue(waitFor(new CleanupStatementJob(s3, getTablesJobId)));
 
     String getColumnsJobId = "test_get_columns_job";
-    waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId));
-    List<Object[]> columns = waitFor(
-        new FetchCatalogResultJob(s3, getColumnsJobId, Integer.MAX_VALUE));
-    assertEquals(2, columns.size());
-    assertEquals("default", columns.get(0)[1]);
-    assertEquals("test", columns.get(0)[2]);
-    assertEquals("id", columns.get(0)[3]);
-    assertEquals("integer", columns.get(0)[5]);
-    assertEquals("default", columns.get(1)[1]);
-    assertEquals("test", columns.get(1)[2]);
-    assertEquals("desc", columns.get(1)[3]);
-    assertEquals("string", columns.get(1)[5]);
-    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getColumnsJobId)));
+    resultTypes = new DataType[] {
+        DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING, 
DataType.INTEGER,
+        DataType.STRING, DataType.INTEGER, DataType.BYTE, DataType.INTEGER, 
DataType.INTEGER,
+        DataType.INTEGER, DataType.STRING, DataType.STRING, DataType.INTEGER, 
DataType.INTEGER,
+        DataType.INTEGER, DataType.INTEGER, DataType.STRING, DataType.STRING, 
DataType.STRING,
+        DataType.STRING, DataType.SHORT, DataType.STRING };
+    waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId, 
resultTypes));
+    rs = waitFor(new FetchResultJob(s3, getColumnsJobId, Integer.MAX_VALUE));
+    cols = rs.getColumns();
+    assertEquals(2, cols[0].size());
+    assertEquals("default", cols[1].get(0));
+    assertEquals("test", cols[2].get(0));
+    assertEquals("id", cols[3].get(0));
+    assertEquals("integer", cols[5].get(0));
+    assertEquals("default", cols[1].get(1));
+    assertEquals("test", cols[2].get(1));
+    assertEquals("desc", cols[3].get(1));
+    assertEquals("string", cols[5].get(1));
+    assertTrue(waitFor(new CleanupStatementJob(s3, getColumnsJobId)));
 
     String getFunctionsJobId = "test_get_functions_job";
-    waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, 
getFunctionsJobId));
-    List<Object[]> functions = waitFor(
-        new FetchCatalogResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE));
-    assertEquals(1, functions.size());
-    assertNull(functions.get(0)[1]);
-    assertEquals("unix_timestamp", functions.get(0)[2]);
-    assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", 
functions.get(0)[5]);
-    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getFunctionsJobId)));
+    resultTypes = new DataType[] {
+        DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING, 
DataType.INTEGER,
+        DataType.STRING };
+    waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, 
getFunctionsJobId, resultTypes));
+    rs = waitFor(new FetchResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE));
+    cols = rs.getColumns();
+    assertEquals(1, cols[0].size());
+    assertNull(cols[1].get(0));
+    assertEquals("unix_timestamp", cols[2].get(0));
+    assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", 
cols[5].get(0));
+    assertTrue(waitFor(new CleanupStatementJob(s3, getFunctionsJobId)));
 
     // Tear down the session.
     waitFor(new UnregisterSessionJob(s3));

Reply via email to