This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 81902e81c43 [FLINK-32211][sql-client] Supports row format in executor 81902e81c43 is described below commit 81902e81c43990c48d05ec81469b7d99c7922698 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Mon May 29 15:01:29 2023 +0800 [FLINK-32211][sql-client] Supports row format in executor In this commit, we also removed DataConverter in flink-sql-jdbc-driver module. Close apache/flink#22671 --- .../flink/table/client/gateway/Executor.java | 9 ++ .../flink/table/client/gateway/ExecutorImpl.java | 37 +++++++- .../apache/flink/table/jdbc/FlinkConnection.java | 4 +- .../apache/flink/table/jdbc/FlinkResultSet.java | 50 +++++----- .../apache/flink/table/jdbc/FlinkStatement.java | 5 +- .../flink/table/jdbc/utils/DataConverter.java | 88 ----------------- .../table/jdbc/utils/DatabaseMetaDataUtils.java | 6 +- .../table/jdbc/utils/DefaultDataConverter.java | 105 --------------------- .../table/jdbc/utils/StringDataConverter.java | 105 --------------------- .../flink/table/jdbc/FlinkResultSetTest.java | 47 ++++----- 10 files changed, 89 insertions(+), 367 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index b636d326560..3be128e0708 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -19,6 +19,7 @@ package org.apache.flink.table.client.gateway; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.service.context.DefaultContext; import java.io.Closeable; @@ -36,6 +37,14 @@ public interface Executor extends Closeable { return new ExecutorImpl(defaultContext, address, sessionId); } + static Executor create( + DefaultContext defaultContext, + InetSocketAddress address, + String sessionId, + RowFormat rowFormat) { + return new ExecutorImpl(defaultContext, address, sessionId, rowFormat); + } + static Executor create(DefaultContext defaultContext, URL address, String sessionId) { return new ExecutorImpl(defaultContext, address, sessionId); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index 4ec7eb3e8eb..f4f6b40ed56 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -112,6 +112,7 @@ public class ExecutorImpl implements Executor { private final SqlGatewayRestAPIVersion connectionVersion; private final SessionHandle sessionHandle; + private final RowFormat rowFormat; public ExecutorImpl( DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId) { @@ -119,11 +120,30 @@ public class ExecutorImpl implements Executor { defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId, - HEARTBEAT_INTERVAL_MILLISECONDS); + HEARTBEAT_INTERVAL_MILLISECONDS, + RowFormat.PLAIN_TEXT); + } + + public ExecutorImpl( + DefaultContext defaultContext, + InetSocketAddress gatewayAddress, + String sessionId, + RowFormat rowFormat) { + this( + defaultContext, + NetUtils.socketToUrl(gatewayAddress), + sessionId, + HEARTBEAT_INTERVAL_MILLISECONDS, + rowFormat); } public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String sessionId) { - this(defaultContext, gatewayUrl, sessionId, HEARTBEAT_INTERVAL_MILLISECONDS); + this( + defaultContext, + gatewayUrl, + sessionId, + HEARTBEAT_INTERVAL_MILLISECONDS, + RowFormat.PLAIN_TEXT); } @VisibleForTesting @@ -132,7 +152,12 @@ public class ExecutorImpl implements Executor { InetSocketAddress gatewayAddress, String sessionId, long heartbeatInterval) { - this(defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId, heartbeatInterval); + this( + defaultContext, + NetUtils.socketToUrl(gatewayAddress), + sessionId, + heartbeatInterval, + RowFormat.PLAIN_TEXT); } @VisibleForTesting @@ -140,9 +165,11 @@ public class ExecutorImpl implements Executor { DefaultContext defaultContext, URL gatewayUrl, String sessionId, - long heartbeatInterval) { + long heartbeatInterval, + RowFormat rowFormat) { this.registry = new AutoCloseableRegistry(); this.gatewayUrl = gatewayUrl; + this.rowFormat = rowFormat; try { // register required resource this.executorService = Executors.newCachedThreadPool(); @@ -433,7 +460,7 @@ public class ExecutorImpl implements Executor { return sendRequest( FetchResultsHeaders.getDefaultInstance(), new FetchResultsMessageParameters( - sessionHandle, operationHandle, token, RowFormat.PLAIN_TEXT), + sessionHandle, operationHandle, token, rowFormat), EmptyRequestBody.getInstance()) .get(); } catch (InterruptedException e) { diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java index 8c0f1a237c6..7fdc99aeddf 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java @@ -21,6 +21,7 @@ package org.apache.flink.table.jdbc; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.StatementResult; +import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.service.context.DefaultContext; import org.apache.flink.table.jdbc.utils.DriverUtils; @@ -59,7 +60,8 @@ public class FlinkConnection extends BaseConnection { DriverUtils.fromProperties(driverUri.getProperties()), Collections.emptyList()), driverUri.getAddress(), - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), + RowFormat.JSON); driverUri.getCatalog().ifPresent(this::setSessionCatalog); driverUri.getDatabase().ifPresent(this::setSessionSchema); } diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java index 9bd6b6fe2f8..6b8306d4cee 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java @@ -21,8 +21,8 @@ package org.apache.flink.table.jdbc; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.client.gateway.StatementResult; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.jdbc.utils.CloseableResultIterator; -import org.apache.flink.table.jdbc.utils.DataConverter; import org.apache.flink.table.jdbc.utils.StatementResultIterator; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.DecimalType; @@ -52,30 +52,20 @@ public class FlinkResultSet extends BaseResultSet { private final List<String> columnNameList; private final Statement statement; private final CloseableResultIterator<RowData> iterator; - private final DataConverter dataConverter; private final FlinkResultSetMetaData resultSetMetaData; private RowData currentRow; private boolean wasNull; private volatile boolean closed; - public FlinkResultSet( - Statement statement, StatementResult result, DataConverter dataConverter) { - this( - statement, - new StatementResultIterator(result), - result.getResultSchema(), - dataConverter); + public FlinkResultSet(Statement statement, StatementResult result) { + this(statement, new StatementResultIterator(result), result.getResultSchema()); } public FlinkResultSet( - Statement statement, - CloseableResultIterator<RowData> iterator, - ResolvedSchema schema, - DataConverter dataConverter) { + Statement statement, CloseableResultIterator<RowData> iterator, ResolvedSchema schema) { this.statement = checkNotNull(statement, "Statement cannot be null"); this.iterator = checkNotNull(iterator, "Statement result cannot be null"); - this.dataConverter = checkNotNull(dataConverter, "Data converter cannot be null"); this.currentRow = null; this.wasNull = false; @@ -155,8 +145,9 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); + StringData stringData = currentRow.getString(columnIndex - 1); try { - return dataConverter.getString(currentRow, columnIndex - 1); + return stringData == null ? null : stringData.toString(); } catch (Exception e) { throw new SQLDataException(e); } @@ -168,7 +159,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getBoolean(currentRow, columnIndex - 1); + return !currentRow.isNullAt(columnIndex - 1) && currentRow.getBoolean(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -180,7 +171,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getByte(currentRow, columnIndex - 1); + return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getByte(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -192,7 +183,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getShort(currentRow, columnIndex - 1); + return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getShort(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -204,7 +195,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getInt(currentRow, columnIndex - 1); + return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getInt(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -217,7 +208,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidColumn(columnIndex); try { - return dataConverter.getLong(currentRow, columnIndex - 1); + return currentRow.isNullAt(columnIndex - 1) ? 0L : currentRow.getLong(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -229,7 +220,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getFloat(currentRow, columnIndex - 1); + return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getFloat(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -241,7 +232,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getDouble(currentRow, columnIndex - 1); + return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getDouble(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -258,7 +249,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { - return dataConverter.getBinary(currentRow, columnIndex - 1); + return currentRow.getBinary(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -392,11 +383,14 @@ public class FlinkResultSet extends BaseResultSet { } DecimalType decimalType = (DecimalType) dataType.getLogicalType(); try { - return dataConverter.getDecimal( - currentRow, - columnIndex - 1, - decimalType.getPrecision(), - decimalType.getScale()); + return currentRow.isNullAt(columnIndex - 1) + ? null + : currentRow + .getDecimal( + columnIndex - 1, + decimalType.getPrecision(), + decimalType.getScale()) + .toBigDecimal(); } catch (Exception e) { throw new SQLDataException(e); } diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java index 9400578b2dd..f0394bdda1a 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java @@ -21,7 +21,6 @@ package org.apache.flink.table.jdbc; import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.StatementResult; -import org.apache.flink.table.jdbc.utils.StringDataConverter; import javax.annotation.concurrent.NotThreadSafe; @@ -58,7 +57,7 @@ public class FlinkStatement extends BaseStatement { if (!result.isQueryResult()) { throw new SQLException(String.format("Statement[%s] is not a query.", sql)); } - currentResults = new FlinkResultSet(this, result, StringDataConverter.CONVERTER); + currentResults = new FlinkResultSet(this, result); return currentResults; } @@ -106,7 +105,7 @@ public class FlinkStatement extends BaseStatement { public boolean execute(String sql) throws SQLException { StatementResult result = executeInternal(sql); if (result.isQueryResult() || result.getResultKind() == ResultKind.SUCCESS_WITH_CONTENT) { - currentResults = new FlinkResultSet(this, result, StringDataConverter.CONVERTER); + currentResults = new FlinkResultSet(this, result); return true; } diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java deleted file mode 100644 index 1709932c8c1..00000000000 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.jdbc.utils; - -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.TimestampData; - -import java.math.BigDecimal; -import java.sql.Array; -import java.sql.Timestamp; -import java.util.Map; - -/** Convert data from row data for result set. */ -public interface DataConverter { - - /** Returns the boolean value at the given position. */ - boolean getBoolean(RowData rowData, int pos); - - /** Returns the byte value at the given position. */ - byte getByte(RowData rowData, int pos); - - /** Returns the short value at the given position. */ - short getShort(RowData rowData, int pos); - - /** Returns the integer value at the given position. */ - int getInt(RowData rowData, int pos); - - /** Returns the long value at the given position. */ - long getLong(RowData rowData, int pos); - - /** Returns the float value at the given position. */ - float getFloat(RowData rowData, int pos); - - /** Returns the double value at the given position. */ - double getDouble(RowData rowData, int pos); - - /** Returns the string value at the given position. */ - String getString(RowData rowData, int pos); - - /** - * Returns the decimal value at the given position. - * - * <p>The precision and scale are required to determine whether the decimal value was stored in - * a compact representation (see {@link DecimalData}). - */ - BigDecimal getDecimal(RowData rowData, int pos, int precision, int scale); - - /** - * Returns the timestamp value at the given position. - * - * <p>The precision is required to determine whether the timestamp value was stored in a compact - * representation (see {@link TimestampData}). - */ - Timestamp getTimestamp(RowData rowData, int pos, int precision); - - /** Returns the binary value at the given position. */ - byte[] getBinary(RowData rowData, int pos); - - /** Returns the array value at the given position. */ - Array getArray(RowData rowData, int pos); - - /** Returns the map value at the given position. */ - Map<?, ?> getMap(RowData rowData, int pos); - - /** - * Returns the row value at the given position. - * - * <p>The number of fields is required to correctly extract the row. - */ - RowData getRow(RowData rowData, int pos, int numFields); -} diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java index b99bd54a30f..11a91c416cb 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java @@ -66,8 +66,7 @@ public class DatabaseMetaDataUtils { return new FlinkResultSet( statement, new CollectionResultIterator(catalogs.iterator()), - ResolvedSchema.of(TABLE_CAT_COLUMN), - StringDataConverter.CONVERTER); + ResolvedSchema.of(TABLE_CAT_COLUMN)); } /** @@ -104,7 +103,6 @@ public class DatabaseMetaDataUtils { return new FlinkResultSet( statement, new CollectionResultIterator(schemaWithCatalogList.iterator()), - ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN), - StringDataConverter.CONVERTER); + ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN)); } } diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java deleted file mode 100644 index c5b65092df2..00000000000 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.jdbc.utils; - -import org.apache.flink.table.data.RowData; - -import java.math.BigDecimal; -import java.sql.Array; -import java.sql.Timestamp; -import java.util.Map; - -/** Default data converter for result set. */ -public class DefaultDataConverter implements DataConverter { - public static final DataConverter CONVERTER = new DefaultDataConverter(); - - private DefaultDataConverter() {} - - @Override - public boolean getBoolean(RowData rowData, int pos) { - return !rowData.isNullAt(pos) && rowData.getBoolean(pos); - } - - @Override - public byte getByte(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : rowData.getByte(pos); - } - - @Override - public short getShort(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : rowData.getShort(pos); - } - - @Override - public int getInt(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : rowData.getInt(pos); - } - - @Override - public long getLong(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : rowData.getLong(pos); - } - - @Override - public float getFloat(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : rowData.getFloat(pos); - } - - @Override - public double getDouble(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : rowData.getDouble(pos); - } - - @Override - public String getString(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? null : rowData.getString(pos).toString(); - } - - @Override - public BigDecimal getDecimal(RowData rowData, int pos, int precision, int scale) { - return rowData.isNullAt(pos) - ? null - : rowData.getDecimal(pos, precision, scale).toBigDecimal(); - } - - @Override - public Timestamp getTimestamp(RowData rowData, int pos, int precision) { - return rowData.isNullAt(pos) ? null : rowData.getTimestamp(pos, precision).toTimestamp(); - } - - @Override - public byte[] getBinary(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? null : rowData.getBinary(pos); - } - - @Override - public Array getArray(RowData rowData, int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public Map<?, ?> getMap(RowData rowData, int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public RowData getRow(RowData rowData, int pos, int numFields) { - return rowData.getRow(pos, numFields); - } -} diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java deleted file mode 100644 index cd8e8e0f1a3..00000000000 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.jdbc.utils; - -import org.apache.flink.table.data.RowData; - -import java.math.BigDecimal; -import java.sql.Array; -import java.sql.Timestamp; -import java.util.Map; - -/** Converter string value to different value. */ -public class StringDataConverter implements DataConverter { - public static final DataConverter CONVERTER = new StringDataConverter(); - - private StringDataConverter() {} - - @Override - public boolean getBoolean(RowData rowData, int pos) { - return Boolean.parseBoolean(getString(rowData, pos)); - } - - @Override - public byte getByte(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : Byte.parseByte(getString(rowData, pos)); - } - - @Override - public short getShort(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : Short.parseShort(getString(rowData, pos)); - } - - @Override - public int getInt(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : Integer.parseInt(getString(rowData, pos)); - } - - @Override - public long getLong(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : Long.parseLong(getString(rowData, pos)); - } - - @Override - public float getFloat(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : Float.parseFloat(getString(rowData, pos)); - } - - @Override - public double getDouble(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? 0 : Double.parseDouble(getString(rowData, pos)); - } - - @Override - public String getString(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? null : rowData.getString(pos).toString(); - } - - @Override - public BigDecimal getDecimal(RowData rowData, int pos, int precision, int scale) { - return rowData.isNullAt(pos) - ? null - : new BigDecimal(getString(rowData, pos)).setScale(scale); - } - - @Override - public byte[] getBinary(RowData rowData, int pos) { - return rowData.isNullAt(pos) ? null : rowData.getString(pos).toBytes(); - } - - @Override - public Timestamp getTimestamp(RowData rowData, int pos, int precision) { - throw new UnsupportedOperationException(); - } - - @Override - public Array getArray(RowData rowData, int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public Map<?, ?> getMap(RowData rowData, int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public RowData getRow(RowData rowData, int pos, int numFields) { - throw new UnsupportedOperationException(); - } -} diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java index 6c68885d18d..88e7756e1d4 100644 --- a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java @@ -28,8 +28,6 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; -import org.apache.flink.table.jdbc.utils.DefaultDataConverter; -import org.apache.flink.table.jdbc.utils.StringDataConverter; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; @@ -38,7 +36,6 @@ import java.math.BigDecimal; import java.sql.ResultSet; import java.sql.SQLDataException; import java.sql.SQLException; -import java.util.Arrays; import java.util.Collections; import java.util.stream.IntStream; @@ -93,8 +90,7 @@ public class FlinkResultSetTest { new FlinkResultSet( new TestingStatement(), new StatementResult( - SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()), - DefaultDataConverter.CONVERTER)) { + SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()))) { validateResultData(resultSet); } } @@ -107,27 +103,28 @@ public class FlinkResultSetTest { .boxed() .map( v -> - stringRowData( - v % 2 == 0, - v.byteValue(), - v.shortValue(), - v, - v.longValue(), - (float) (v + 0.1), - v + 0.22, - DecimalData.fromBigDecimal( - new BigDecimal(v + ".55555"), - 10, - 5), - StringData.fromString(v.toString()), - v.toString())) + (RowData) + GenericRowData.of( + v % 2 == 0, + v.byteValue(), + v.shortValue(), + v, + v.longValue(), + (float) (v + 0.1), + v + 0.22, + DecimalData.fromBigDecimal( + new BigDecimal( + v + ".55555"), + 10, + 5), + StringData.fromString(v.toString()), + v.toString().getBytes())) .iterator()); try (ResultSet resultSet = new FlinkResultSet( new TestingStatement(), new StatementResult( - SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()), - StringDataConverter.CONVERTER)) { + SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()))) { validateResultData(resultSet); } } @@ -146,8 +143,7 @@ public class FlinkResultSetTest { new FlinkResultSet( new TestingStatement(), new StatementResult( - SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()), - StringDataConverter.CONVERTER)) { + SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()))) { assertTrue(resultSet.next()); assertFalse(resultSet.getBoolean(1)); assertEquals((byte) 0, resultSet.getByte(2)); @@ -163,11 +159,6 @@ public class FlinkResultSetTest { } } - private RowData stringRowData(Object... values) { - return GenericRowData.of( - Arrays.stream(values).map(v -> StringData.fromString(v.toString())).toArray()); - } - private static void validateResultData(ResultSet resultSet) throws SQLException { int resultCount = 0; while (resultSet.next()) {