CurtHagenlocher commented on code in PR #2665:
URL: https://github.com/apache/arrow-adbc/pull/2665#discussion_r2027279614
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -909,6 +915,237 @@ private static StructArray GetTableSchemas(
nullBitmapBuffer.Build());
}
+ internal async Task<TGetCatalogsResp>
GetCatalogsAsync(CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetCatalogsReq req = new TGetCatalogsReq(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+
+ TGetCatalogsResp resp = await Client.GetCatalogs(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetSchemasResp> GetSchemasAsync(string?
catalogName, string? schemaName, CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetSchemasReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrEmpty(catalogName))
+ {
+ req.CatalogName = catalogName;
+ }
+ if (!string.IsNullOrEmpty(schemaName))
+ {
+ req.SchemaName = schemaName;
+ }
+
+ TGetSchemasResp resp = await Client.GetSchemas(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetTablesResp> GetTablesAsync(string?
catalogName, string? schemaName, string? tableName, List<string>? tableTypes,
CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetTablesReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrEmpty(catalogName))
+ {
+ req.CatalogName = catalogName;
+ }
+ if (!string.IsNullOrEmpty(schemaName))
+ {
+ req.SchemaName = schemaName;
+ }
+ if (!string.IsNullOrEmpty(tableName))
+ {
+ req.TableName = tableName;
+ }
+ if (tableTypes != null && tableTypes.Count > 0)
+ {
+ req.TableTypes = tableTypes;
+ }
+
+ TGetTablesResp resp = await Client.GetTables(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetColumnsResp> GetColumnsAsync(string?
catalogName, string? schemaName, string? tableName, string? columnName,
CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetColumnsReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrEmpty(catalogName))
+ {
+ req.CatalogName = catalogName;
+ }
+ if (!string.IsNullOrEmpty(schemaName))
+ {
+ req.SchemaName = schemaName;
+ }
+ if (!string.IsNullOrEmpty(tableName))
+ {
+ req.TableName = tableName;
+ }
+ if (!string.IsNullOrEmpty(columnName))
+ {
+ req.ColumnName = columnName;
+ }
+
+ TGetColumnsResp resp = await Client.GetColumns(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetPrimaryKeysResp> GetPrimaryKeysAsync(
+ string? catalogName,
+ string? schemaName,
+ string? tableName,
+ CancellationToken cancellationToken = default)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetPrimaryKeysReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrWhiteSpace(catalogName))
+ {
+ req.CatalogName = catalogName!;
+ }
+ if (!string.IsNullOrWhiteSpace(schemaName))
+ {
+ req.SchemaName = schemaName!;
+ }
+ if (!string.IsNullOrWhiteSpace(tableName))
+ {
+ req.TableName = tableName!;
+ }
+
+ TGetPrimaryKeysResp resp = await Client.GetPrimaryKeys(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetCrossReferenceResp> GetCrossReferenceAsync(
+ string? catalogName,
+ string? schemaName,
+ string? tableName,
+ string? foreignCatalogName,
+ string? foreignSchemaName,
+ string? foreignTableName,
+ CancellationToken cancellationToken = default)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetCrossReferenceReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrWhiteSpace(catalogName))
+ {
+ req.ParentCatalogName = catalogName!;
+ }
+ if (!string.IsNullOrWhiteSpace(schemaName))
+ {
+ req.ParentSchemaName = schemaName!;
+ }
+ if (!string.IsNullOrWhiteSpace(tableName))
+ {
+ req.ParentTableName = tableName!;
+ }
+ if (!string.IsNullOrWhiteSpace(foreignCatalogName))
+ {
+ req.ForeignCatalogName = foreignCatalogName!;
+ }
+ if (!string.IsNullOrWhiteSpace(schemaName))
+ {
+ req.ForeignSchemaName = foreignSchemaName!;
+ }
+ if (!string.IsNullOrWhiteSpace(tableName))
+ {
+ req.ForeignTableName = foreignTableName!;
+ }
+
+
Review Comment:
nit: extra blank line
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -909,6 +915,237 @@ private static StructArray GetTableSchemas(
nullBitmapBuffer.Build());
}
+ internal async Task<TGetCatalogsResp>
GetCatalogsAsync(CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetCatalogsReq req = new TGetCatalogsReq(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+
+ TGetCatalogsResp resp = await Client.GetCatalogs(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetSchemasResp> GetSchemasAsync(string?
catalogName, string? schemaName, CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetSchemasReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrEmpty(catalogName))
Review Comment:
Could there be a difference in semantics between "value not set", "value is
null" and "value is blank" when it comes to setting these parameter values? For
instance, maybe "value is blank" matches only a blank catalog name while "value
not set" matches any catalog name? Obviously we can't distinguish between all
three possibilities in C# without doing something extraordinary, but I wonder
whether it's correct to treat null and blank identically.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -255,5 +320,120 @@ protected void
ValidateOptions(IReadOnlyDictionary<string, string> properties)
}
}
}
+
+ private async Task<QueryResult>
ExecuteMetadataCommandQuery(CancellationToken cancellationToken)
+ {
+ return SqlQuery?.ToLowerInvariant() switch
+ {
+ GetCatalogsCommandName => await
GetCatalogsAsync(cancellationToken),
+ GetSchemasCommandName => await
GetSchemasAsync(cancellationToken),
+ GetTablesCommandName => await
GetTablesAsync(cancellationToken),
+ GetColumnsCommandName => await
GetColumnsAsync(cancellationToken),
+ GetPrimaryKeysCommandName => await
GetPrimaryKeysAsync(cancellationToken),
+ GetCrossReferenceCommandName => await
GetCrossReferenceAsync(cancellationToken),
+ null => throw new ArgumentNullException(nameof(SqlQuery),
$"Metadata command for property 'SqlQuery' must not be empty or null. Supported
metadata commands: {SupportedMetadataCommands}"),
+ "" => throw new ArgumentNullException(nameof(SqlQuery),
$"Metadata command for property 'SqlQuery' must not be empty or null. Supported
metadata commands: {SupportedMetadataCommands}"),
Review Comment:
I believe it's possible to say `null or ""` as the condition in a switch
expression.
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs:
##########
@@ -909,6 +915,237 @@ private static StructArray GetTableSchemas(
nullBitmapBuffer.Build());
}
+ internal async Task<TGetCatalogsResp>
GetCatalogsAsync(CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetCatalogsReq req = new TGetCatalogsReq(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+
+ TGetCatalogsResp resp = await Client.GetCatalogs(req,
cancellationToken);
+ if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ return resp;
+ }
+
+ internal async Task<TGetSchemasResp> GetSchemasAsync(string?
catalogName, string? schemaName, CancellationToken cancellationToken)
+ {
+ if (SessionHandle == null)
+ {
+ throw new InvalidOperationException("Invalid session");
+ }
+
+ TGetSchemasReq req = new(SessionHandle);
+ if (AreResultsAvailableDirectly())
+ {
+ SetDirectResults(req);
+ }
+ if (!string.IsNullOrEmpty(catalogName))
Review Comment:
(Applies to all these parameters, of course, and not just catalog name.)
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs:
##########
@@ -122,29 +122,41 @@ public HiveServer2Reader(
}
}
- private RecordBatch CreateBatch(TFetchResultsResp response, int
columnCount, int rowCount)
+ private RecordBatch CreateBatch(TFetchResultsResp response, int
columnCount, int rowCount)
{
- IList<IArrowArray> columnData = [];
- bool shouldConvertScalar =
_dataTypeConversion.HasFlag(DataTypeConversion.Scalar);
+ IReadOnlyList<IArrowArray> columnData =
GetArrowArrayData(response.Results, columnCount, Schema, _dataTypeConversion);
+
+ return new RecordBatch(Schema, columnData, rowCount);
+ }
+
+ internal static IReadOnlyList<IArrowArray> GetArrowArrayData(TRowSet
response, int columnCount, Schema schema, DataTypeConversion dataTypeConversion)
+ {
+ List<IArrowArray> columnData = [];
+ bool shouldConvertScalar =
dataTypeConversion.HasFlag(DataTypeConversion.Scalar);
for (int i = 0; i < columnCount; i++)
{
- IArrowType? expectedType = shouldConvertScalar ?
Schema.FieldsList[i].DataType : null;
- IArrowArray columnArray =
GetArray(response.Results.Columns[i], expectedType);
+ IArrowType? expectedType = shouldConvertScalar ?
schema.FieldsList[i].DataType : null;
+ IArrowArray columnArray = GetArray(response.Columns[i],
expectedType);
columnData.Add(columnArray);
}
- return new RecordBatch(Schema, columnData, rowCount);
+ return columnData;
}
- private static int GetColumnCount(TFetchResultsResp response) =>
- response.Results.Columns.Count;
+ //private RecordBatch CreateBatch(TFetchResultsResp response, int
columnCount, int rowCount)
+ //{
+ // return CreateBatch(response.Results, columnCount, rowCount,
Schema, _dataTypeConversion);
+ //}
Review Comment:
If this is not needed is there any reason not to remove it?
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs:
##########
@@ -122,29 +122,41 @@ public HiveServer2Reader(
}
}
- private RecordBatch CreateBatch(TFetchResultsResp response, int
columnCount, int rowCount)
+ private RecordBatch CreateBatch(TFetchResultsResp response, int
columnCount, int rowCount)
Review Comment:
nit: extra space after `private`
##########
csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs:
##########
@@ -255,5 +320,120 @@ protected void
ValidateOptions(IReadOnlyDictionary<string, string> properties)
}
}
}
+
+ private async Task<QueryResult>
ExecuteMetadataCommandQuery(CancellationToken cancellationToken)
+ {
+ return SqlQuery?.ToLowerInvariant() switch
+ {
+ GetCatalogsCommandName => await
GetCatalogsAsync(cancellationToken),
+ GetSchemasCommandName => await
GetSchemasAsync(cancellationToken),
+ GetTablesCommandName => await
GetTablesAsync(cancellationToken),
+ GetColumnsCommandName => await
GetColumnsAsync(cancellationToken),
+ GetPrimaryKeysCommandName => await
GetPrimaryKeysAsync(cancellationToken),
+ GetCrossReferenceCommandName => await
GetCrossReferenceAsync(cancellationToken),
+ null => throw new ArgumentNullException(nameof(SqlQuery),
$"Metadata command for property 'SqlQuery' must not be empty or null. Supported
metadata commands: {SupportedMetadataCommands}"),
+ "" => throw new ArgumentNullException(nameof(SqlQuery),
$"Metadata command for property 'SqlQuery' must not be empty or null. Supported
metadata commands: {SupportedMetadataCommands}"),
+ _ => throw new NotSupportedException($"Metadata command
'{SqlQuery}' is not supported. Supported metadata commands:
{SupportedMetadataCommands}"),
+ };
+ }
+
+ private async Task<QueryResult>
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
+ {
+ TGetCrossReferenceResp resp = await
Connection.GetCrossReferenceAsync(
+ CatalogName,
+ SchemaName,
+ TableName,
+ ForeignCatalogName,
+ ForeignSchemaName,
+ ForeignTableName,
+ cancellationToken);
+ OperationHandle = resp.OperationHandle;
+
+ return await GetQueryResult(resp.DirectResults, cancellationToken);
+ }
+
+ private async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken
cancellationToken = default)
+ {
+ // Note: allows catalog/schema/table to be null or empty
Review Comment:
Is there a specific reason to call this out here vs other methods?
##########
csharp/src/Drivers/Apache/ApacheParameters.cs:
##########
@@ -25,5 +25,15 @@ public class ApacheParameters
public const string PollTimeMilliseconds =
"adbc.apache.statement.polltime_ms";
public const string BatchSize = "adbc.apache.statement.batch_size";
public const string QueryTimeoutSeconds =
"adbc.apache.statement.query_timeout_s";
+ public const string IsMetadataCommand =
"adbc.apache.statement.is_metadata_command";
+
+ public const string CatalogName = "adbc.apache.catalog_name";
+ public const string SchemaName = "adbc.apache.schema_name";
+ public const string TableName = "adbc.apache.table_name";
+ public const string TableTypes = "adbc.apache.table_types";
+ public const string ColumnName = "adbc.apache.column_name";
+ public const string ForeignCatalogName =
"adbc.apache.foreign_catalog_name";
+ public const string ForeignSchemaName =
"adbc.apache.foreign_schema_name";
+ public const string ForeignTableName =
"adbc.apache.foreign_table_name";
Review Comment:
I wish I had insisted more strongly on a different set of names in the
beginning because now we're never going to change them :(. I find it's "just
weird" to call these things "adbc.apache" as if the entire project wasn't an
Apache project already. They should have been named something like
"adbc.hiveserver2".
Anyway, I think we should take a cue from the existing option names for
ingestion e.g. `adbc.ingest.target_catalog` and name these after their
semantic purpose, so maybe something like `adbc.get_metadata.target_catalog`,
`adbc.get_metadata.target_db_schema`, etc. trying to aim for maximum
consistency with metadata identifier names that already exist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]