wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931189354


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        
toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if 
(spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, 
(byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", 
DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    
GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to 
serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), 
StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), 
StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                
DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new 
BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) 
LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                        .forValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        
.expectValue("{\"Hello\":\"World\",\"World\":\"Hello\"}")
+                        .expectNullValue("null"),
+                DataTypeSpec.newSpec()
+                        .forType(
+                                DataTypes.ARRAY(
+                                        DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING())))
+                        .forValue(new GenericArrayData(new Object[] {mapData, 
mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue(
+                                
"[{\"Hello\":\"World\",\"World\":\"Hello\"},{\"Hello\":\"World\",\"World\":\"Hello\"}]")
+                        .expectNullValue("null"));

Review Comment:
   Why there is a null value for the complex types?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -419,44 +569,49 @@ public boolean equals(Object o) {
         }
         HiveServer2Endpoint that = (HiveServer2Endpoint) o;
 
-        return minWorkerThreads == that.minWorkerThreads
+        return Objects.equals(host, that.host)
+                && port == that.port
+                && minWorkerThreads == that.minWorkerThreads
                 && maxWorkerThreads == that.maxWorkerThreads
                 && requestTimeoutMs == that.requestTimeoutMs
                 && backOffSlotLengthMs == that.backOffSlotLengthMs
                 && maxMessageSize == that.maxMessageSize
-                && port == that.port
                 && Objects.equals(workerKeepAliveTime, 
that.workerKeepAliveTime)
                 && Objects.equals(catalogName, that.catalogName)
                 && Objects.equals(defaultDatabase, that.defaultDatabase)
                 && Objects.equals(hiveConfPath, that.hiveConfPath)
                 && Objects.equals(allowEmbedded, that.allowEmbedded)
+                && Objects.equals(isVerbose, that.isVerbose)
                 && Objects.equals(moduleName, that.moduleName);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
+                host,
+                port,
                 minWorkerThreads,
                 maxWorkerThreads,
                 workerKeepAliveTime,
                 requestTimeoutMs,
                 backOffSlotLengthMs,
                 maxMessageSize,
-                port,
                 catalogName,
                 defaultDatabase,
                 hiveConfPath,
                 allowEmbedded,
+                isVerbose,
                 moduleName);
     }
 
     @Override
     public void run() {
         try {
-            LOG.info("HiveServer2 Endpoint begins to listen on {}.", port);
+            LOG.info("HiveServer2 Endpoint begins to listen on {}:{}.", host, 
port);

Review Comment:
   The hostname may be not resolvable by externals. We should print the IP 
address here. 
   
   You can resolve the host into IP address in the class constructor. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -290,12 +309,34 @@ public void close() {
         }
 
         public ResultSet fetchResults(long token, int maxRows) {
+            return fetchResultsInternal(() -> 
resultFetcher.fetchResults(token, maxRows));
+        }
+
+        public ResultSet fetchResults(FetchOrientation orientation, int 
maxRows) {
+            return fetchResultsInternal(() -> 
resultFetcher.fetchResults(orientation, maxRows));
+        }
+
+        public ResolvedSchema getResultSchema() {
+            OperationStatus current = status.get();
+            if (current != OperationStatus.FINISHED || !hasResults) {

Review Comment:
   This is not addressed. It seems `hasResults` is redundant here?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        
toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if 
(spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, 
(byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", 
DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    
GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to 
serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), 
StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), 
StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                
DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new 
BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) 
LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))

Review Comment:
   Could you create an issue to support non-string keys in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to