This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 81902e81c43 [FLINK-32211][sql-client] Supports row format in executor
81902e81c43 is described below

commit 81902e81c43990c48d05ec81469b7d99c7922698
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Mon May 29 15:01:29 2023 +0800

    [FLINK-32211][sql-client] Supports row format in executor
    
    In this commit, we also removed DataConverter in flink-sql-jdbc-driver 
module.
    
    Close apache/flink#22671
---
 .../flink/table/client/gateway/Executor.java       |   9 ++
 .../flink/table/client/gateway/ExecutorImpl.java   |  37 +++++++-
 .../apache/flink/table/jdbc/FlinkConnection.java   |   4 +-
 .../apache/flink/table/jdbc/FlinkResultSet.java    |  50 +++++-----
 .../apache/flink/table/jdbc/FlinkStatement.java    |   5 +-
 .../flink/table/jdbc/utils/DataConverter.java      |  88 -----------------
 .../table/jdbc/utils/DatabaseMetaDataUtils.java    |   6 +-
 .../table/jdbc/utils/DefaultDataConverter.java     | 105 ---------------------
 .../table/jdbc/utils/StringDataConverter.java      | 105 ---------------------
 .../flink/table/jdbc/FlinkResultSetTest.java       |  47 ++++-----
 10 files changed, 89 insertions(+), 367 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index b636d326560..3be128e0708 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway;
 
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 
 import java.io.Closeable;
@@ -36,6 +37,14 @@ public interface Executor extends Closeable {
         return new ExecutorImpl(defaultContext, address, sessionId);
     }
 
+    static Executor create(
+            DefaultContext defaultContext,
+            InetSocketAddress address,
+            String sessionId,
+            RowFormat rowFormat) {
+        return new ExecutorImpl(defaultContext, address, sessionId, rowFormat);
+    }
+
     static Executor create(DefaultContext defaultContext, URL address, String 
sessionId) {
         return new ExecutorImpl(defaultContext, address, sessionId);
     }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 4ec7eb3e8eb..f4f6b40ed56 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -112,6 +112,7 @@ public class ExecutorImpl implements Executor {
 
     private final SqlGatewayRestAPIVersion connectionVersion;
     private final SessionHandle sessionHandle;
+    private final RowFormat rowFormat;
 
     public ExecutorImpl(
             DefaultContext defaultContext, InetSocketAddress gatewayAddress, 
String sessionId) {
@@ -119,11 +120,30 @@ public class ExecutorImpl implements Executor {
                 defaultContext,
                 NetUtils.socketToUrl(gatewayAddress),
                 sessionId,
-                HEARTBEAT_INTERVAL_MILLISECONDS);
+                HEARTBEAT_INTERVAL_MILLISECONDS,
+                RowFormat.PLAIN_TEXT);
+    }
+
+    public ExecutorImpl(
+            DefaultContext defaultContext,
+            InetSocketAddress gatewayAddress,
+            String sessionId,
+            RowFormat rowFormat) {
+        this(
+                defaultContext,
+                NetUtils.socketToUrl(gatewayAddress),
+                sessionId,
+                HEARTBEAT_INTERVAL_MILLISECONDS,
+                rowFormat);
     }
 
     public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String 
sessionId) {
-        this(defaultContext, gatewayUrl, sessionId, 
HEARTBEAT_INTERVAL_MILLISECONDS);
+        this(
+                defaultContext,
+                gatewayUrl,
+                sessionId,
+                HEARTBEAT_INTERVAL_MILLISECONDS,
+                RowFormat.PLAIN_TEXT);
     }
 
     @VisibleForTesting
@@ -132,7 +152,12 @@ public class ExecutorImpl implements Executor {
             InetSocketAddress gatewayAddress,
             String sessionId,
             long heartbeatInterval) {
-        this(defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId, 
heartbeatInterval);
+        this(
+                defaultContext,
+                NetUtils.socketToUrl(gatewayAddress),
+                sessionId,
+                heartbeatInterval,
+                RowFormat.PLAIN_TEXT);
     }
 
     @VisibleForTesting
@@ -140,9 +165,11 @@ public class ExecutorImpl implements Executor {
             DefaultContext defaultContext,
             URL gatewayUrl,
             String sessionId,
-            long heartbeatInterval) {
+            long heartbeatInterval,
+            RowFormat rowFormat) {
         this.registry = new AutoCloseableRegistry();
         this.gatewayUrl = gatewayUrl;
+        this.rowFormat = rowFormat;
         try {
             // register required resource
             this.executorService = Executors.newCachedThreadPool();
@@ -433,7 +460,7 @@ public class ExecutorImpl implements Executor {
             return sendRequest(
                             FetchResultsHeaders.getDefaultInstance(),
                             new FetchResultsMessageParameters(
-                                    sessionHandle, operationHandle, token, 
RowFormat.PLAIN_TEXT),
+                                    sessionHandle, operationHandle, token, 
rowFormat),
                             EmptyRequestBody.getInstance())
                     .get();
         } catch (InterruptedException e) {
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
index 8c0f1a237c6..7fdc99aeddf 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.jdbc;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 import org.apache.flink.table.jdbc.utils.DriverUtils;
 
@@ -59,7 +60,8 @@ public class FlinkConnection extends BaseConnection {
                                 
DriverUtils.fromProperties(driverUri.getProperties()),
                                 Collections.emptyList()),
                         driverUri.getAddress(),
-                        UUID.randomUUID().toString());
+                        UUID.randomUUID().toString(),
+                        RowFormat.JSON);
         driverUri.getCatalog().ifPresent(this::setSessionCatalog);
         driverUri.getDatabase().ifPresent(this::setSessionSchema);
     }
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index 9bd6b6fe2f8..6b8306d4cee 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -21,8 +21,8 @@ package org.apache.flink.table.jdbc;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.client.gateway.StatementResult;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.jdbc.utils.CloseableResultIterator;
-import org.apache.flink.table.jdbc.utils.DataConverter;
 import org.apache.flink.table.jdbc.utils.StatementResultIterator;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -52,30 +52,20 @@ public class FlinkResultSet extends BaseResultSet {
     private final List<String> columnNameList;
     private final Statement statement;
     private final CloseableResultIterator<RowData> iterator;
-    private final DataConverter dataConverter;
     private final FlinkResultSetMetaData resultSetMetaData;
     private RowData currentRow;
     private boolean wasNull;
 
     private volatile boolean closed;
 
-    public FlinkResultSet(
-            Statement statement, StatementResult result, DataConverter 
dataConverter) {
-        this(
-                statement,
-                new StatementResultIterator(result),
-                result.getResultSchema(),
-                dataConverter);
+    public FlinkResultSet(Statement statement, StatementResult result) {
+        this(statement, new StatementResultIterator(result), 
result.getResultSchema());
     }
 
     public FlinkResultSet(
-            Statement statement,
-            CloseableResultIterator<RowData> iterator,
-            ResolvedSchema schema,
-            DataConverter dataConverter) {
+            Statement statement, CloseableResultIterator<RowData> iterator, 
ResolvedSchema schema) {
         this.statement = checkNotNull(statement, "Statement cannot be null");
         this.iterator = checkNotNull(iterator, "Statement result cannot be 
null");
-        this.dataConverter = checkNotNull(dataConverter, "Data converter 
cannot be null");
         this.currentRow = null;
         this.wasNull = false;
 
@@ -155,8 +145,9 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
 
+        StringData stringData = currentRow.getString(columnIndex - 1);
         try {
-            return dataConverter.getString(currentRow, columnIndex - 1);
+            return stringData == null ? null : stringData.toString();
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -168,7 +159,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getBoolean(currentRow, columnIndex - 1);
+            return !currentRow.isNullAt(columnIndex - 1) && 
currentRow.getBoolean(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -180,7 +171,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getByte(currentRow, columnIndex - 1);
+            return currentRow.isNullAt(columnIndex - 1) ? 0 : 
currentRow.getByte(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -192,7 +183,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getShort(currentRow, columnIndex - 1);
+            return currentRow.isNullAt(columnIndex - 1) ? 0 : 
currentRow.getShort(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -204,7 +195,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getInt(currentRow, columnIndex - 1);
+            return currentRow.isNullAt(columnIndex - 1) ? 0 : 
currentRow.getInt(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -217,7 +208,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidColumn(columnIndex);
 
         try {
-            return dataConverter.getLong(currentRow, columnIndex - 1);
+            return currentRow.isNullAt(columnIndex - 1) ? 0L : 
currentRow.getLong(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -229,7 +220,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getFloat(currentRow, columnIndex - 1);
+            return currentRow.isNullAt(columnIndex - 1) ? 0 : 
currentRow.getFloat(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -241,7 +232,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getDouble(currentRow, columnIndex - 1);
+            return currentRow.isNullAt(columnIndex - 1) ? 0 : 
currentRow.getDouble(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -258,7 +249,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return dataConverter.getBinary(currentRow, columnIndex - 1);
+            return currentRow.getBinary(columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -392,11 +383,14 @@ public class FlinkResultSet extends BaseResultSet {
         }
         DecimalType decimalType = (DecimalType) dataType.getLogicalType();
         try {
-            return dataConverter.getDecimal(
-                    currentRow,
-                    columnIndex - 1,
-                    decimalType.getPrecision(),
-                    decimalType.getScale());
+            return currentRow.isNullAt(columnIndex - 1)
+                    ? null
+                    : currentRow
+                            .getDecimal(
+                                    columnIndex - 1,
+                                    decimalType.getPrecision(),
+                                    decimalType.getScale())
+                            .toBigDecimal();
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
index 9400578b2dd..f0394bdda1a 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.jdbc;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.StatementResult;
-import org.apache.flink.table.jdbc.utils.StringDataConverter;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
@@ -58,7 +57,7 @@ public class FlinkStatement extends BaseStatement {
         if (!result.isQueryResult()) {
             throw new SQLException(String.format("Statement[%s] is not a 
query.", sql));
         }
-        currentResults = new FlinkResultSet(this, result, 
StringDataConverter.CONVERTER);
+        currentResults = new FlinkResultSet(this, result);
 
         return currentResults;
     }
@@ -106,7 +105,7 @@ public class FlinkStatement extends BaseStatement {
     public boolean execute(String sql) throws SQLException {
         StatementResult result = executeInternal(sql);
         if (result.isQueryResult() || result.getResultKind() == 
ResultKind.SUCCESS_WITH_CONTENT) {
-            currentResults = new FlinkResultSet(this, result, 
StringDataConverter.CONVERTER);
+            currentResults = new FlinkResultSet(this, result);
             return true;
         }
 
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
deleted file mode 100644
index 1709932c8c1..00000000000
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
-
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Timestamp;
-import java.util.Map;
-
-/** Convert data from row data for result set. */
-public interface DataConverter {
-
-    /** Returns the boolean value at the given position. */
-    boolean getBoolean(RowData rowData, int pos);
-
-    /** Returns the byte value at the given position. */
-    byte getByte(RowData rowData, int pos);
-
-    /** Returns the short value at the given position. */
-    short getShort(RowData rowData, int pos);
-
-    /** Returns the integer value at the given position. */
-    int getInt(RowData rowData, int pos);
-
-    /** Returns the long value at the given position. */
-    long getLong(RowData rowData, int pos);
-
-    /** Returns the float value at the given position. */
-    float getFloat(RowData rowData, int pos);
-
-    /** Returns the double value at the given position. */
-    double getDouble(RowData rowData, int pos);
-
-    /** Returns the string value at the given position. */
-    String getString(RowData rowData, int pos);
-
-    /**
-     * Returns the decimal value at the given position.
-     *
-     * <p>The precision and scale are required to determine whether the 
decimal value was stored in
-     * a compact representation (see {@link DecimalData}).
-     */
-    BigDecimal getDecimal(RowData rowData, int pos, int precision, int scale);
-
-    /**
-     * Returns the timestamp value at the given position.
-     *
-     * <p>The precision is required to determine whether the timestamp value 
was stored in a compact
-     * representation (see {@link TimestampData}).
-     */
-    Timestamp getTimestamp(RowData rowData, int pos, int precision);
-
-    /** Returns the binary value at the given position. */
-    byte[] getBinary(RowData rowData, int pos);
-
-    /** Returns the array value at the given position. */
-    Array getArray(RowData rowData, int pos);
-
-    /** Returns the map value at the given position. */
-    Map<?, ?> getMap(RowData rowData, int pos);
-
-    /**
-     * Returns the row value at the given position.
-     *
-     * <p>The number of fields is required to correctly extract the row.
-     */
-    RowData getRow(RowData rowData, int pos, int numFields);
-}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
index b99bd54a30f..11a91c416cb 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
@@ -66,8 +66,7 @@ public class DatabaseMetaDataUtils {
         return new FlinkResultSet(
                 statement,
                 new CollectionResultIterator(catalogs.iterator()),
-                ResolvedSchema.of(TABLE_CAT_COLUMN),
-                StringDataConverter.CONVERTER);
+                ResolvedSchema.of(TABLE_CAT_COLUMN));
     }
 
     /**
@@ -104,7 +103,6 @@ public class DatabaseMetaDataUtils {
         return new FlinkResultSet(
                 statement,
                 new CollectionResultIterator(schemaWithCatalogList.iterator()),
-                ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN),
-                StringDataConverter.CONVERTER);
+                ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN));
     }
 }
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
deleted file mode 100644
index c5b65092df2..00000000000
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.RowData;
-
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Timestamp;
-import java.util.Map;
-
-/** Default data converter for result set. */
-public class DefaultDataConverter implements DataConverter {
-    public static final DataConverter CONVERTER = new DefaultDataConverter();
-
-    private DefaultDataConverter() {}
-
-    @Override
-    public boolean getBoolean(RowData rowData, int pos) {
-        return !rowData.isNullAt(pos) && rowData.getBoolean(pos);
-    }
-
-    @Override
-    public byte getByte(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : rowData.getByte(pos);
-    }
-
-    @Override
-    public short getShort(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : rowData.getShort(pos);
-    }
-
-    @Override
-    public int getInt(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : rowData.getInt(pos);
-    }
-
-    @Override
-    public long getLong(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : rowData.getLong(pos);
-    }
-
-    @Override
-    public float getFloat(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : rowData.getFloat(pos);
-    }
-
-    @Override
-    public double getDouble(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : rowData.getDouble(pos);
-    }
-
-    @Override
-    public String getString(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? null : 
rowData.getString(pos).toString();
-    }
-
-    @Override
-    public BigDecimal getDecimal(RowData rowData, int pos, int precision, int 
scale) {
-        return rowData.isNullAt(pos)
-                ? null
-                : rowData.getDecimal(pos, precision, scale).toBigDecimal();
-    }
-
-    @Override
-    public Timestamp getTimestamp(RowData rowData, int pos, int precision) {
-        return rowData.isNullAt(pos) ? null : rowData.getTimestamp(pos, 
precision).toTimestamp();
-    }
-
-    @Override
-    public byte[] getBinary(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? null : rowData.getBinary(pos);
-    }
-
-    @Override
-    public Array getArray(RowData rowData, int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<?, ?> getMap(RowData rowData, int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RowData getRow(RowData rowData, int pos, int numFields) {
-        return rowData.getRow(pos, numFields);
-    }
-}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
deleted file mode 100644
index cd8e8e0f1a3..00000000000
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.RowData;
-
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Timestamp;
-import java.util.Map;
-
-/** Converter string value to different value. */
-public class StringDataConverter implements DataConverter {
-    public static final DataConverter CONVERTER = new StringDataConverter();
-
-    private StringDataConverter() {}
-
-    @Override
-    public boolean getBoolean(RowData rowData, int pos) {
-        return Boolean.parseBoolean(getString(rowData, pos));
-    }
-
-    @Override
-    public byte getByte(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : Byte.parseByte(getString(rowData, 
pos));
-    }
-
-    @Override
-    public short getShort(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : Short.parseShort(getString(rowData, 
pos));
-    }
-
-    @Override
-    public int getInt(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : Integer.parseInt(getString(rowData, 
pos));
-    }
-
-    @Override
-    public long getLong(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : Long.parseLong(getString(rowData, 
pos));
-    }
-
-    @Override
-    public float getFloat(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : Float.parseFloat(getString(rowData, 
pos));
-    }
-
-    @Override
-    public double getDouble(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? 0 : 
Double.parseDouble(getString(rowData, pos));
-    }
-
-    @Override
-    public String getString(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? null : 
rowData.getString(pos).toString();
-    }
-
-    @Override
-    public BigDecimal getDecimal(RowData rowData, int pos, int precision, int 
scale) {
-        return rowData.isNullAt(pos)
-                ? null
-                : new BigDecimal(getString(rowData, pos)).setScale(scale);
-    }
-
-    @Override
-    public byte[] getBinary(RowData rowData, int pos) {
-        return rowData.isNullAt(pos) ? null : rowData.getString(pos).toBytes();
-    }
-
-    @Override
-    public Timestamp getTimestamp(RowData rowData, int pos, int precision) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Array getArray(RowData rowData, int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<?, ?> getMap(RowData rowData, int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public RowData getRow(RowData rowData, int pos, int numFields) {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
index 6c68885d18d..88e7756e1d4 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.jdbc.utils.DefaultDataConverter;
-import org.apache.flink.table.jdbc.utils.StringDataConverter;
 import org.apache.flink.util.CloseableIterator;
 
 import org.junit.jupiter.api.Test;
@@ -38,7 +36,6 @@ import java.math.BigDecimal;
 import java.sql.ResultSet;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.stream.IntStream;
 
@@ -93,8 +90,7 @@ public class FlinkResultSetTest {
                 new FlinkResultSet(
                         new TestingStatement(),
                         new StatementResult(
-                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()),
-                        DefaultDataConverter.CONVERTER)) {
+                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()))) {
             validateResultData(resultSet);
         }
     }
@@ -107,27 +103,28 @@ public class FlinkResultSetTest {
                                 .boxed()
                                 .map(
                                         v ->
-                                                stringRowData(
-                                                        v % 2 == 0,
-                                                        v.byteValue(),
-                                                        v.shortValue(),
-                                                        v,
-                                                        v.longValue(),
-                                                        (float) (v + 0.1),
-                                                        v + 0.22,
-                                                        
DecimalData.fromBigDecimal(
-                                                                new 
BigDecimal(v + ".55555"),
-                                                                10,
-                                                                5),
-                                                        
StringData.fromString(v.toString()),
-                                                        v.toString()))
+                                                (RowData)
+                                                        GenericRowData.of(
+                                                                v % 2 == 0,
+                                                                v.byteValue(),
+                                                                v.shortValue(),
+                                                                v,
+                                                                v.longValue(),
+                                                                (float) (v + 
0.1),
+                                                                v + 0.22,
+                                                                
DecimalData.fromBigDecimal(
+                                                                        new 
BigDecimal(
+                                                                               
 v + ".55555"),
+                                                                        10,
+                                                                        5),
+                                                                
StringData.fromString(v.toString()),
+                                                                
v.toString().getBytes()))
                                 .iterator());
         try (ResultSet resultSet =
                 new FlinkResultSet(
                         new TestingStatement(),
                         new StatementResult(
-                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()),
-                        StringDataConverter.CONVERTER)) {
+                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()))) {
             validateResultData(resultSet);
         }
     }
@@ -146,8 +143,7 @@ public class FlinkResultSetTest {
                 new FlinkResultSet(
                         new TestingStatement(),
                         new StatementResult(
-                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()),
-                        StringDataConverter.CONVERTER)) {
+                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()))) {
             assertTrue(resultSet.next());
             assertFalse(resultSet.getBoolean(1));
             assertEquals((byte) 0, resultSet.getByte(2));
@@ -163,11 +159,6 @@ public class FlinkResultSetTest {
         }
     }
 
-    private RowData stringRowData(Object... values) {
-        return GenericRowData.of(
-                Arrays.stream(values).map(v -> 
StringData.fromString(v.toString())).toArray());
-    }
-
     private static void validateResultData(ResultSet resultSet) throws 
SQLException {
         int resultCount = 0;
         while (resultSet.next()) {

Reply via email to