This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 05d721f5d48 Properly handle number sign in MySQL Proxy binary protocol 
(#23430)
05d721f5d48 is described below

commit 05d721f5d484a84d2fb4023bf9e2b4799fe573e0
Author: 吴伟杰 <[email protected]>
AuthorDate: Mon Jan 9 16:47:56 2023 +0800

    Properly handle number sign in MySQL Proxy binary protocol (#23430)
---
 .../binary/execute/MySQLComStmtExecutePacket.java  |  9 ++-
 .../execute/protocol/MySQLBinaryProtocolValue.java |  3 +-
 .../protocol/MySQLDateBinaryProtocolValue.java     |  2 +-
 .../protocol/MySQLDoubleBinaryProtocolValue.java   |  2 +-
 .../protocol/MySQLFloatBinaryProtocolValue.java    |  2 +-
 .../protocol/MySQLInt1BinaryProtocolValue.java     |  8 +-
 .../protocol/MySQLInt2BinaryProtocolValue.java     |  8 +-
 .../protocol/MySQLInt4BinaryProtocolValue.java     |  8 +-
 .../protocol/MySQLInt8BinaryProtocolValue.java     |  2 +-
 .../MySQLStringLenencBinaryProtocolValue.java      |  2 +-
 .../protocol/MySQLTimeBinaryProtocolValue.java     |  2 +-
 .../execute/MySQLComStmtExecutePacketTest.java     |  6 +-
 .../protocol/MySQLDateBinaryProtocolValueTest.java | 10 +--
 .../MySQLDoubleBinaryProtocolValueTest.java        |  2 +-
 .../MySQLFloatBinaryProtocolValueTest.java         |  2 +-
 .../protocol/MySQLInt1BinaryProtocolValueTest.java |  6 +-
 .../protocol/MySQLInt2BinaryProtocolValueTest.java |  6 +-
 .../protocol/MySQLInt4BinaryProtocolValueTest.java |  6 +-
 .../protocol/MySQLInt8BinaryProtocolValueTest.java |  2 +-
 .../MySQLStringLenencBinaryProtocolValueTest.java  |  2 +-
 .../protocol/MySQLTimeBinaryProtocolValueTest.java |  8 +-
 .../query/binary/MySQLServerPreparedStatement.java |  2 +
 .../execute/MySQLComStmtExecuteExecutor.java       |  2 +-
 .../prepare/MySQLComStmtPrepareExecutor.java       | 63 ++++++++++-----
 ...ySQLComStmtPrepareParameterMarkerExtractor.java | 89 ++++++++++++++++++++++
 .../command/MySQLCommandPacketFactoryTest.java     |  4 +-
 .../admin/MySQLComResetConnectionExecutorTest.java |  3 +-
 .../MySQLComStmtSendLongDataExecutorTest.java      |  2 +-
 .../execute/MySQLComStmtExecuteExecutorTest.java   |  6 +-
 .../prepare/MySQLComStmtPrepareExecutorTest.java   | 60 ++++++++++++---
 ...ComStmtPrepareParameterMarkerExtractorTest.java | 62 +++++++++++++++
 .../reset/MySQLComStmtResetExecutorTest.java       |  3 +-
 ...insert_min_values_into_single_table_integer.xml | 28 -------
 ...negative_smallint_into_single_table_integer.xml | 28 -------
 ...t_negative_values_into_single_table_integer.xml | 28 -------
 .../cases/dml/dml-integration-test-cases.xml       | 10 +--
 36 files changed, 319 insertions(+), 169 deletions(-)

diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
index bb941eed3f9..0dd5f088b1e 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinitionFlag;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol.MySQLBinaryProtocolValue;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol.MySQLBinaryProtocolValueFactory;
@@ -97,10 +98,12 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
      *
      * @param paramTypes parameter type of values
      * @param longDataIndexes indexes of long data
+     * @param parameterFlags column definition flag of parameters
      * @return parameter values
      * @throws SQLException SQL exception
      */
-    public List<Object> readParameters(final 
List<MySQLPreparedStatementParameterType> paramTypes, final Set<Integer> 
longDataIndexes) throws SQLException {
+    public List<Object> readParameters(final 
List<MySQLPreparedStatementParameterType> paramTypes, final Set<Integer> 
longDataIndexes,
+                                       final List<Integer> parameterFlags) 
throws SQLException {
         List<Object> result = new ArrayList<>(paramTypes.size());
         for (int paramIndex = 0; paramIndex < paramTypes.size(); paramIndex++) 
{
             if (longDataIndexes.contains(paramIndex)) {
@@ -108,7 +111,9 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
                 continue;
             }
             MySQLBinaryProtocolValue binaryProtocolValue = 
MySQLBinaryProtocolValueFactory.getBinaryProtocolValue(paramTypes.get(paramIndex).getColumnType());
-            result.add(nullBitmap.isNullParameter(paramIndex) ? null : 
binaryProtocolValue.read(payload));
+            Object value = nullBitmap.isNullParameter(paramIndex) ? null
+                    : binaryProtocolValue.read(payload, 
(parameterFlags.get(paramIndex) & 
MySQLColumnDefinitionFlag.UNSIGNED.getValue()) == 
MySQLColumnDefinitionFlag.UNSIGNED.getValue());
+            result.add(value);
         }
         return result;
     }
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLBinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLBinaryProtocolValue.java
index e9d51df4ecb..a0c6e43d919 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLBinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLBinaryProtocolValue.java
@@ -32,10 +32,11 @@ public interface MySQLBinaryProtocolValue {
      * Read binary protocol value.
      *
      * @param payload payload operation for MySQL packet
+     * @param unsigned is unsigned value
      * @return binary value result
      * @throws SQLException SQL exception
      */
-    Object read(MySQLPacketPayload payload) throws SQLException;
+    Object read(MySQLPacketPayload payload, boolean unsigned) throws 
SQLException;
     
     /**
      * Write binary protocol value.
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValue.java
index 7a9a711743f..8dc112aadde 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValue.java
@@ -31,7 +31,7 @@ import java.util.Date;
 public final class MySQLDateBinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
     @Override
-    public Object read(final MySQLPacketPayload payload) throws SQLException {
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) throws SQLException {
         int length = payload.readInt1();
         switch (length) {
             case 0:
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValue.java
index 9609a1be945..4e523a0195a 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValue.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 public final class MySQLDoubleBinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
     @Override
-    public Object read(final MySQLPacketPayload payload) {
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
         return payload.getByteBuf().readDoubleLE();
     }
     
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValue.java
index 3074c64fd6a..74881a88cdf 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValue.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 public final class MySQLFloatBinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
     @Override
-    public Object read(final MySQLPacketPayload payload) {
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
         return payload.getByteBuf().readFloatLE();
     }
     
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValue.java
index f74eec1af2e..a8666dea338 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValue.java
@@ -24,9 +24,13 @@ import 
org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
  */
 public final class MySQLInt1BinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
+    @SuppressWarnings("SimplifiableIfStatement")
     @Override
-    public Object read(final MySQLPacketPayload payload) {
-        return payload.readInt1();
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
+        if (unsigned) {
+            return payload.getByteBuf().readUnsignedByte();
+        }
+        return payload.getByteBuf().readByte();
     }
     
     @Override
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValue.java
index eff513341e3..f83f3d6ecd1 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValue.java
@@ -24,9 +24,13 @@ import 
org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
  */
 public final class MySQLInt2BinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
+    @SuppressWarnings("SimplifiableIfStatement")
     @Override
-    public Object read(final MySQLPacketPayload payload) {
-        return payload.readInt2();
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
+        if (unsigned) {
+            return payload.getByteBuf().readUnsignedShortLE();
+        }
+        return payload.getByteBuf().readShortLE();
     }
     
     @Override
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValue.java
index d39569746a7..5dc130c8b97 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValue.java
@@ -26,9 +26,13 @@ import java.math.BigDecimal;
  */
 public final class MySQLInt4BinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
+    @SuppressWarnings("SimplifiableIfStatement")
     @Override
-    public Object read(final MySQLPacketPayload payload) {
-        return payload.readInt4();
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
+        if (unsigned) {
+            return payload.getByteBuf().readUnsignedIntLE();
+        }
+        return payload.getByteBuf().readIntLE();
     }
     
     @Override
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValue.java
index 2620d62ffc2..c23ec92e3c4 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValue.java
@@ -28,7 +28,7 @@ import java.math.BigInteger;
 public final class MySQLInt8BinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
     @Override
-    public Object read(final MySQLPacketPayload payload) {
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
         return payload.readInt8();
     }
     
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValue.java
index 8145260658c..f2a8acf48c5 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValue.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 public final class MySQLStringLenencBinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
     @Override
-    public Object read(final MySQLPacketPayload payload) {
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) {
         return payload.readStringLenenc();
     }
     
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValue.java
index 70ea99ea3de..8137aaa5925 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValue.java
@@ -31,7 +31,7 @@ import java.util.Calendar;
 public final class MySQLTimeBinaryProtocolValue implements 
MySQLBinaryProtocolValue {
     
     @Override
-    public Object read(final MySQLPacketPayload payload) throws SQLException {
+    public Object read(final MySQLPacketPayload payload, final boolean 
unsigned) throws SQLException {
         int length = payload.readInt1();
         payload.readInt1();
         payload.readInt4();
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index 26ec86a2d66..516f37f6253 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -57,7 +57,7 @@ public final class MySQLComStmtExecutePacketTest {
         assertThat(parameterTypes.size(), is(1));
         assertThat(parameterTypes.get(0).getColumnType(), 
is(MySQLBinaryColumnType.MYSQL_TYPE_LONG));
         assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
-        assertThat(actual.readParameters(parameterTypes, 
Collections.emptySet()), is(Collections.<Object>singletonList(1)));
+        assertThat(actual.readParameters(parameterTypes, 
Collections.emptySet(), Collections.singletonList(0)), 
is(Collections.<Object>singletonList(1)));
     }
     
     @Test
@@ -71,7 +71,7 @@ public final class MySQLComStmtExecutePacketTest {
         assertThat(parameterTypes.size(), is(1));
         assertThat(parameterTypes.get(0).getColumnType(), 
is(MySQLBinaryColumnType.MYSQL_TYPE_LONG));
         assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
-        assertThat(actual.readParameters(parameterTypes, 
Collections.emptySet()), is(Collections.singletonList(null)));
+        assertThat(actual.readParameters(parameterTypes, 
Collections.emptySet(), Collections.emptyList()), 
is(Collections.singletonList(null)));
     }
     
     @Test
@@ -85,7 +85,7 @@ public final class MySQLComStmtExecutePacketTest {
         assertThat(parameterTypes.size(), is(1));
         assertThat(parameterTypes.get(0).getColumnType(), 
is(MySQLBinaryColumnType.MYSQL_TYPE_BLOB));
         assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
-        assertThat(actual.readParameters(parameterTypes, 
Collections.singleton(0)), is(Collections.singletonList(null)));
+        assertThat(actual.readParameters(parameterTypes, 
Collections.singleton(0), Collections.emptyList()), 
is(Collections.singletonList(null)));
         assertThat(actual.toString(), 
is("MySQLComStmtExecutePacket(statementId=2)"));
     }
 }
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValueTest.java
index a0eecb586b2..4f04de8b621 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDateBinaryProtocolValueTest.java
@@ -41,7 +41,7 @@ public final class MySQLDateBinaryProtocolValueTest {
     
     @Test(expected = SQLFeatureNotSupportedException.class)
     public void assertReadWithZeroByte() throws SQLException {
-        new MySQLDateBinaryProtocolValue().read(payload);
+        new MySQLDateBinaryProtocolValue().read(payload, false);
     }
     
     @Test
@@ -49,7 +49,7 @@ public final class MySQLDateBinaryProtocolValueTest {
         when(payload.readInt1()).thenReturn(4, 12, 31);
         when(payload.readInt2()).thenReturn(2018);
         Calendar actual = Calendar.getInstance();
-        actual.setTimeInMillis(((Timestamp) new 
MySQLDateBinaryProtocolValue().read(payload)).getTime());
+        actual.setTimeInMillis(((Timestamp) new 
MySQLDateBinaryProtocolValue().read(payload, false)).getTime());
         assertThat(actual.get(Calendar.YEAR), is(2018));
         assertThat(actual.get(Calendar.MONTH), is(Calendar.DECEMBER));
         assertThat(actual.get(Calendar.DAY_OF_MONTH), is(31));
@@ -60,7 +60,7 @@ public final class MySQLDateBinaryProtocolValueTest {
         when(payload.readInt1()).thenReturn(7, 12, 31, 10, 59, 0);
         when(payload.readInt2()).thenReturn(2018);
         Calendar actual = Calendar.getInstance();
-        actual.setTimeInMillis(((Timestamp) new 
MySQLDateBinaryProtocolValue().read(payload)).getTime());
+        actual.setTimeInMillis(((Timestamp) new 
MySQLDateBinaryProtocolValue().read(payload, false)).getTime());
         assertThat(actual.get(Calendar.YEAR), is(2018));
         assertThat(actual.get(Calendar.MONTH), is(Calendar.DECEMBER));
         assertThat(actual.get(Calendar.DAY_OF_MONTH), is(31));
@@ -75,7 +75,7 @@ public final class MySQLDateBinaryProtocolValueTest {
         when(payload.readInt2()).thenReturn(2018);
         when(payload.readInt4()).thenReturn(232323);
         Calendar actual = Calendar.getInstance();
-        Timestamp actualTimestamp = (Timestamp) new 
MySQLDateBinaryProtocolValue().read(payload);
+        Timestamp actualTimestamp = (Timestamp) new 
MySQLDateBinaryProtocolValue().read(payload, false);
         actual.setTimeInMillis(actualTimestamp.getTime());
         assertThat(actual.get(Calendar.YEAR), is(2018));
         assertThat(actual.get(Calendar.MONTH), is(Calendar.DECEMBER));
@@ -89,7 +89,7 @@ public final class MySQLDateBinaryProtocolValueTest {
     @Test(expected = SQLFeatureNotSupportedException.class)
     public void assertReadWithIllegalArgument() throws SQLException {
         when(payload.readInt1()).thenReturn(100);
-        new MySQLDateBinaryProtocolValue().read(payload);
+        new MySQLDateBinaryProtocolValue().read(payload, false);
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValueTest.java
index a7458e5da9c..5e4757fbf5c 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLDoubleBinaryProtocolValueTest.java
@@ -40,7 +40,7 @@ public final class MySQLDoubleBinaryProtocolValueTest {
     @Test
     public void assertRead() {
         when(byteBuf.readDoubleLE()).thenReturn(1.0d);
-        assertThat(new MySQLDoubleBinaryProtocolValue().read(new 
MySQLPacketPayload(byteBuf, StandardCharsets.UTF_8)), is(1.0d));
+        assertThat(new MySQLDoubleBinaryProtocolValue().read(new 
MySQLPacketPayload(byteBuf, StandardCharsets.UTF_8), false), is(1.0d));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValueTest.java
index 0b1a04571a7..f0a1b9c09fd 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLFloatBinaryProtocolValueTest.java
@@ -40,7 +40,7 @@ public final class MySQLFloatBinaryProtocolValueTest {
     @Test
     public void assertRead() {
         when(byteBuf.readFloatLE()).thenReturn(1.0f);
-        assertThat(new MySQLFloatBinaryProtocolValue().read(new 
MySQLPacketPayload(byteBuf, StandardCharsets.UTF_8)), is(1.0f));
+        assertThat(new MySQLFloatBinaryProtocolValue().read(new 
MySQLPacketPayload(byteBuf, StandardCharsets.UTF_8), false), is(1.0f));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValueTest.java
index 594f1f2d21e..457c541d611 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt1BinaryProtocolValueTest.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol;
 
+import io.netty.buffer.Unpooled;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,8 +37,9 @@ public final class MySQLInt1BinaryProtocolValueTest {
     
     @Test
     public void assertRead() {
-        when(payload.readInt1()).thenReturn(1);
-        assertThat(new MySQLInt1BinaryProtocolValue().read(payload), is(1));
+        when(payload.getByteBuf()).thenReturn(Unpooled.wrappedBuffer(new 
byte[]{1, 1}));
+        assertThat(new MySQLInt1BinaryProtocolValue().read(payload, false), 
is((byte) 1));
+        assertThat(new MySQLInt1BinaryProtocolValue().read(payload, true), 
is((short) 1));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValueTest.java
index e3c46b09702..303aaafc66c 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt2BinaryProtocolValueTest.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol;
 
+import io.netty.buffer.Unpooled;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,8 +37,9 @@ public final class MySQLInt2BinaryProtocolValueTest {
     
     @Test
     public void assertRead() {
-        when(payload.readInt2()).thenReturn(1);
-        assertThat(new MySQLInt2BinaryProtocolValue().read(payload), is(1));
+        when(payload.getByteBuf()).thenReturn(Unpooled.wrappedBuffer(new 
byte[]{1, 0, 1, 0}));
+        assertThat(new MySQLInt2BinaryProtocolValue().read(payload, false), 
is((short) 1));
+        assertThat(new MySQLInt2BinaryProtocolValue().read(payload, true), 
is(1));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValueTest.java
index 5496e3799f8..08bb13b92ec 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt4BinaryProtocolValueTest.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol;
 
+import io.netty.buffer.Unpooled;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,8 +37,9 @@ public final class MySQLInt4BinaryProtocolValueTest {
     
     @Test
     public void assertRead() {
-        when(payload.readInt4()).thenReturn(1);
-        assertThat(new MySQLInt4BinaryProtocolValue().read(payload), is(1));
+        when(payload.getByteBuf()).thenReturn(Unpooled.wrappedBuffer(new 
byte[]{1, 0, 0, 0, 1, 0, 0, 0}));
+        assertThat(new MySQLInt4BinaryProtocolValue().read(payload, false), 
is(1));
+        assertThat(new MySQLInt4BinaryProtocolValue().read(payload, true), 
is(1L));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValueTest.java
index fca772779bf..a4c8d6f89cd 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLInt8BinaryProtocolValueTest.java
@@ -39,7 +39,7 @@ public final class MySQLInt8BinaryProtocolValueTest {
     @Test
     public void assertRead() {
         when(payload.readInt8()).thenReturn(1L);
-        assertThat(new MySQLInt8BinaryProtocolValue().read(payload), is(1L));
+        assertThat(new MySQLInt8BinaryProtocolValue().read(payload, false), 
is(1L));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValueTest.java
index 5f4f9392c86..d8a42d051c4 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLStringLenencBinaryProtocolValueTest.java
@@ -37,7 +37,7 @@ public final class MySQLStringLenencBinaryProtocolValueTest {
     @Test
     public void assertRead() {
         when(payload.readStringLenenc()).thenReturn("value");
-        assertThat(new MySQLStringLenencBinaryProtocolValue().read(payload), 
is("value"));
+        assertThat(new MySQLStringLenencBinaryProtocolValue().read(payload, 
false), is("value"));
     }
     
     @Test
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValueTest.java
index 41bd0af15eb..44419371e5a 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/protocol/MySQLTimeBinaryProtocolValueTest.java
@@ -44,14 +44,14 @@ public final class MySQLTimeBinaryProtocolValueTest {
     
     @Test
     public void assertReadWithZeroByte() throws SQLException {
-        assertThat(new MySQLTimeBinaryProtocolValue().read(payload), is(new 
Timestamp(0)));
+        assertThat(new MySQLTimeBinaryProtocolValue().read(payload, false), 
is(new Timestamp(0)));
     }
     
     @Test
     public void assertReadWithEightBytes() throws SQLException {
         when(payload.readInt1()).thenReturn(8, 0, 10, 59, 0);
         Calendar actual = Calendar.getInstance();
-        actual.setTimeInMillis(((Timestamp) new 
MySQLTimeBinaryProtocolValue().read(payload)).getTime());
+        actual.setTimeInMillis(((Timestamp) new 
MySQLTimeBinaryProtocolValue().read(payload, false)).getTime());
         assertThat(actual.get(Calendar.HOUR_OF_DAY), is(10));
         assertThat(actual.get(Calendar.MINUTE), is(59));
         assertThat(actual.get(Calendar.SECOND), is(0));
@@ -61,7 +61,7 @@ public final class MySQLTimeBinaryProtocolValueTest {
     public void assertReadWithTwelveBytes() throws SQLException {
         when(payload.readInt1()).thenReturn(12, 0, 10, 59, 0);
         Calendar actual = Calendar.getInstance();
-        actual.setTimeInMillis(((Timestamp) new 
MySQLTimeBinaryProtocolValue().read(payload)).getTime());
+        actual.setTimeInMillis(((Timestamp) new 
MySQLTimeBinaryProtocolValue().read(payload, false)).getTime());
         assertThat(actual.get(Calendar.HOUR_OF_DAY), is(10));
         assertThat(actual.get(Calendar.MINUTE), is(59));
         assertThat(actual.get(Calendar.SECOND), is(0));
@@ -70,7 +70,7 @@ public final class MySQLTimeBinaryProtocolValueTest {
     @Test(expected = SQLFeatureNotSupportedException.class)
     public void assertReadWithIllegalArgument() throws SQLException {
         when(payload.readInt1()).thenReturn(100);
-        new MySQLTimeBinaryProtocolValue().read(payload);
+        new MySQLTimeBinaryProtocolValue().read(payload, false);
     }
     
     @Test
diff --git 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLServerPreparedStatement.java
 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLServerPreparedStatement.java
index fded1dbfd2f..55e30102d53 100644
--- 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLServerPreparedStatement.java
+++ 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLServerPreparedStatement.java
@@ -40,6 +40,8 @@ public final class MySQLServerPreparedStatement implements 
ServerPreparedStateme
     
     private final SQLStatementContext<?> sqlStatementContext;
     
+    private final List<Integer> parameterColumnDefinitionFlags;
+    
     private final List<MySQLPreparedStatementParameterType> parameterTypes = 
new CopyOnWriteArrayList<>();
     
     private final Map<Integer, byte[]> longData = new ConcurrentHashMap<>();
diff --git 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index cf9ecf6f688..724e1013a68 100644
--- 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -70,7 +70,7 @@ public final class MySQLComStmtExecuteExecutor implements 
QueryCommandExecutor {
     @Override
     public Collection<DatabasePacket<?>> execute() throws SQLException {
         MySQLServerPreparedStatement preparedStatement = 
updateAndGetPreparedStatement();
-        List<Object> params = 
packet.readParameters(preparedStatement.getParameterTypes(), 
preparedStatement.getLongData().keySet());
+        List<Object> params = 
packet.readParameters(preparedStatement.getParameterTypes(), 
preparedStatement.getLongData().keySet(), 
preparedStatement.getParameterColumnDefinitionFlags());
         preparedStatement.getLongData().forEach(params::set);
         SQLStatementContext<?> sqlStatementContext = 
preparedStatement.getSqlStatementContext();
         if (sqlStatementContext instanceof ParameterAware) {
diff --git 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
index 37ca0e5158c..66d1a673608 100644
--- 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
+++ 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
@@ -33,12 +33,12 @@ import 
org.apache.shardingsphere.infra.binder.segment.select.projection.Projecti
 import 
org.apache.shardingsphere.infra.binder.segment.select.projection.impl.ColumnProjection;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -47,6 +47,8 @@ import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLServerPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.ParameterMarkerSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.AbstractSQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.ArrayList;
@@ -56,6 +58,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /**
@@ -77,11 +80,12 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
         if (!MySQLComStmtPrepareChecker.isStatementAllowed(sqlStatement)) {
             throw new UnsupportedPreparedStatementException();
         }
-        int statementId = 
MySQLStatementIDGenerator.getInstance().nextStatementId(connectionSession.getConnectionId());
         SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(),
                 sqlStatement, connectionSession.getDefaultDatabaseName());
-        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(statementId,
 new MySQLServerPreparedStatement(packet.getSql(), sqlStatementContext));
-        return createPackets(statementId, sqlStatementContext);
+        int statementId = 
MySQLStatementIDGenerator.getInstance().nextStatementId(connectionSession.getConnectionId());
+        MySQLServerPreparedStatement serverPreparedStatement = new 
MySQLServerPreparedStatement(packet.getSql(), sqlStatementContext, new 
CopyOnWriteArrayList<>());
+        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(statementId,
 serverPreparedStatement);
+        return createPackets(sqlStatementContext, statementId, 
serverPreparedStatement);
     }
     
     private void failedIfContainsMultiStatements() {
@@ -93,7 +97,7 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
         }
     }
     
-    private Collection<DatabasePacket<?>> createPackets(final int statementId, 
final SQLStatementContext<?> sqlStatementContext) {
+    private Collection<DatabasePacket<?>> createPackets(final 
SQLStatementContext<?> sqlStatementContext, final int statementId, final 
MySQLServerPreparedStatement serverPreparedStatement) {
         Collection<DatabasePacket<?>> result = new LinkedList<>();
         List<Projection> projections = getProjections(sqlStatementContext);
         int parameterCount = 
sqlStatementContext.getSqlStatement().getParameterCount();
@@ -101,7 +105,7 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
         int characterSet = 
connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
         int statusFlags = 
ServerStatusFlagCalculator.calculateFor(connectionSession);
         if (parameterCount > 0) {
-            
result.addAll(createParameterColumnDefinition41Packets(parameterCount, 
characterSet));
+            
result.addAll(createParameterColumnDefinition41Packets(sqlStatementContext, 
characterSet, serverPreparedStatement));
             result.add(new MySQLEofPacket(statusFlags));
         }
         if (!projections.isEmpty()) {
@@ -115,20 +119,29 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
         return sqlStatementContext instanceof SelectStatementContext ? 
((SelectStatementContext) 
sqlStatementContext).getProjectionsContext().getExpandProjections() : 
Collections.emptyList();
     }
     
-    private Collection<DatabasePacket<?>> 
createParameterColumnDefinition41Packets(final int parameterCount, final int 
characterSet) {
-        Collection<DatabasePacket<?>> result = new ArrayList<>(parameterCount);
-        for (int i = 0; i < parameterCount; i++) {
-            result.add(new MySQLColumnDefinition41Packet(characterSet, "", "", 
"", "?", "", 0, MySQLBinaryColumnType.MYSQL_TYPE_VAR_STRING, 0, false));
+    private Collection<DatabasePacket<?>> 
createParameterColumnDefinition41Packets(final SQLStatementContext<?> 
sqlStatementContext, final int characterSet,
+                                                                               
    final MySQLServerPreparedStatement serverPreparedStatement) {
+        Map<ParameterMarkerSegment, ShardingSphereColumn> 
columnsOfParameterMarkers =
+                
MySQLComStmtPrepareParameterMarkerExtractor.findColumnsOfParameterMarkers(sqlStatementContext.getSqlStatement(),
 getSchema(sqlStatementContext));
+        Collection<ParameterMarkerSegment> parameterMarkerSegments = 
((AbstractSQLStatement) 
sqlStatementContext.getSqlStatement()).getParameterMarkerSegments();
+        Collection<DatabasePacket<?>> result = new 
ArrayList<>(parameterMarkerSegments.size());
+        for (ParameterMarkerSegment each : parameterMarkerSegments) {
+            ShardingSphereColumn column = columnsOfParameterMarkers.get(each);
+            if (null != column) {
+                int columnDefinitionFlag = 
calculateColumnDefinitionFlag(column);
+                result.add(createMySQLColumnDefinition41Packet(characterSet, 
columnDefinitionFlag, 
MySQLBinaryColumnType.valueOfJDBCType(column.getDataType())));
+                
serverPreparedStatement.getParameterColumnDefinitionFlags().add(columnDefinitionFlag);
+            } else {
+                result.add(createMySQLColumnDefinition41Packet(characterSet, 
0, MySQLBinaryColumnType.MYSQL_TYPE_VAR_STRING));
+                
serverPreparedStatement.getParameterColumnDefinitionFlags().add(0);
+            }
         }
         return result;
     }
     
     private Collection<DatabasePacket<?>> 
createProjectionColumnDefinition41Packets(final SelectStatementContext 
selectStatementContext, final int characterSet) {
         Collection<Projection> projections = 
selectStatementContext.getProjectionsContext().getExpandProjections();
-        String databaseName = 
selectStatementContext.getTablesContext().getDatabaseName().orElseGet(connectionSession::getDefaultDatabaseName);
-        ShardingSphereDatabase database = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
-        ShardingSphereSchema schema = 
selectStatementContext.getTablesContext().getSchemaName()
-                .map(database::getSchema).orElseGet(() -> 
database.getSchema(DatabaseTypeEngine.getDefaultSchemaName(selectStatementContext.getDatabaseType(),
 database.getName())));
+        ShardingSphereSchema schema = getSchema(selectStatementContext);
         Map<String, String> columnToTableMap = 
selectStatementContext.getTablesContext()
                 
.findTableNamesByColumnProjection(projections.stream().filter(each -> each 
instanceof ColumnProjection).map(each -> (ColumnProjection) 
each).collect(Collectors.toList()), schema);
         Collection<DatabasePacket<?>> result = new 
ArrayList<>(projections.size());
@@ -136,22 +149,30 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
             // TODO Calculate column definition flag for other projection types
             if (each instanceof ColumnProjection) {
                 
result.add(Optional.ofNullable(columnToTableMap.get(each.getExpression())).map(schema::getTable).map(table
 -> table.getColumns().get(((ColumnProjection) each).getName()))
-                        .map(column -> {
-                            MySQLBinaryColumnType columnType = 
MySQLBinaryColumnType.valueOfJDBCType(column.getDataType());
-                            return new 
MySQLColumnDefinition41Packet(characterSet, 
calculateColumnDefinitionFlag(column), "", "", "", "", "", 0, columnType, 0, 
false);
-                        })
-                        .orElseGet(() -> new 
MySQLColumnDefinition41Packet(characterSet, "", "", "", "", "", 0, 
MySQLBinaryColumnType.MYSQL_TYPE_VAR_STRING, 0, false)));
+                        .map(column -> 
createMySQLColumnDefinition41Packet(characterSet, 
calculateColumnDefinitionFlag(column), 
MySQLBinaryColumnType.valueOfJDBCType(column.getDataType())))
+                        .orElseGet(() -> 
createMySQLColumnDefinition41Packet(characterSet, 0, 
MySQLBinaryColumnType.MYSQL_TYPE_VAR_STRING)));
             } else {
-                result.add(new MySQLColumnDefinition41Packet(characterSet, "", 
"", "", "", "", 0, MySQLBinaryColumnType.MYSQL_TYPE_VAR_STRING, 0, false));
+                result.add(createMySQLColumnDefinition41Packet(characterSet, 
0, MySQLBinaryColumnType.MYSQL_TYPE_VAR_STRING));
             }
         }
         return result;
     }
     
+    private ShardingSphereSchema getSchema(final SQLStatementContext<?> 
sqlStatementContext) {
+        String databaseName = 
sqlStatementContext.getTablesContext().getDatabaseName().orElseGet(connectionSession::getDefaultDatabaseName);
+        ShardingSphereDatabase database = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
+        return 
sqlStatementContext.getTablesContext().getSchemaName().map(database::getSchema)
+                .orElseGet(() -> 
database.getSchema(DatabaseTypeEngine.getDefaultSchemaName(sqlStatementContext.getDatabaseType(),
 database.getName())));
+    }
+    
     private int calculateColumnDefinitionFlag(final ShardingSphereColumn 
column) {
         int result = 0;
         result |= column.isPrimaryKey() ? 
MySQLColumnDefinitionFlag.PRIMARY_KEY.getValue() : 0;
         result |= column.isUnsigned() ? 
MySQLColumnDefinitionFlag.UNSIGNED.getValue() : 0;
         return result;
     }
+    
+    private MySQLColumnDefinition41Packet 
createMySQLColumnDefinition41Packet(final int characterSet, final int 
columnDefinitionFlag, final MySQLBinaryColumnType columnType) {
+        return new MySQLColumnDefinition41Packet(characterSet, 
columnDefinitionFlag, "", "", "", "", "", 0, columnType, 0, false);
+    }
 }
diff --git 
a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareParameterMarkerExtractor.java
 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareParameterMarkerExtractor.java
new file mode 100644
index 00000000000..45de08beb8f
--- /dev/null
+++ 
b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareParameterMarkerExtractor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prepare;
+
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.InsertValuesSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.expr.ExpressionSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.expr.simple.ParameterMarkerExpressionSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.ParameterMarkerSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Parameter marker extractor for MySQL COM_STMT_PREPARE.
+ */
+public final class MySQLComStmtPrepareParameterMarkerExtractor {
+    
+    /**
+     * TODO Support more statements and syntax.
+     * Find corresponding columns of parameter markers.
+     *
+     * @param sqlStatement SQL statement
+     * @param schema schema
+     * @return map parameter marker segment to column
+     */
+    public static Map<ParameterMarkerSegment, ShardingSphereColumn> 
findColumnsOfParameterMarkers(final SQLStatement sqlStatement, final 
ShardingSphereSchema schema) {
+        return sqlStatement instanceof InsertStatement ? 
findColumnsOfParameterMarkersForInsert((InsertStatement) sqlStatement, schema) 
: Collections.emptyMap();
+    }
+    
+    private static Map<ParameterMarkerSegment, ShardingSphereColumn> 
findColumnsOfParameterMarkersForInsert(final InsertStatement insertStatement, 
final ShardingSphereSchema schema) {
+        ShardingSphereTable table = 
schema.getTable(insertStatement.getTable().getTableName().getIdentifier().getValue());
+        List<String> columnNamesOfInsert = 
getColumnNamesOfInsertStatement(insertStatement, table);
+        Map<String, ShardingSphereColumn> columnsOfTable = table.getColumns();
+        Map<String, ShardingSphereColumn> caseInsensitiveColumnsOfTable = 
convertToCaseInsensitiveColumnsOfTable(columnsOfTable);
+        Map<ParameterMarkerSegment, ShardingSphereColumn> result = new 
LinkedHashMap<>(insertStatement.getParameterCount(), 1);
+        for (InsertValuesSegment each : insertStatement.getValues()) {
+            ListIterator<ExpressionSegment> listIterator = 
each.getValues().listIterator();
+            for (int columnIndex = listIterator.nextIndex(); 
listIterator.hasNext(); columnIndex = listIterator.nextIndex()) {
+                ExpressionSegment value = listIterator.next();
+                if (!(value instanceof ParameterMarkerExpressionSegment)) {
+                    continue;
+                }
+                String columnName = columnNamesOfInsert.get(columnIndex);
+                ShardingSphereColumn column = 
columnsOfTable.getOrDefault(columnName, 
caseInsensitiveColumnsOfTable.get(columnName));
+                if (null != column) {
+                    result.put((ParameterMarkerSegment) value, column);
+                }
+            }
+        }
+        return result;
+    }
+    
+    private static List<String> getColumnNamesOfInsertStatement(final 
InsertStatement insertStatement, final ShardingSphereTable table) {
+        return insertStatement.getColumns().isEmpty() ? new 
ArrayList<>(table.getColumns().keySet())
+                : insertStatement.getColumns().stream().map(each -> 
each.getIdentifier().getValue()).collect(Collectors.toList());
+    }
+    
+    private static Map<String, ShardingSphereColumn> 
convertToCaseInsensitiveColumnsOfTable(final Map<String, ShardingSphereColumn> 
columns) {
+        Map<String, ShardingSphereColumn> result = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        result.putAll(columns);
+        return result;
+    }
+}
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
index c6d37c8b125..66e33161c82 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
@@ -45,6 +45,8 @@ import org.mockito.Answers;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Collections;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -96,7 +98,7 @@ public final class MySQLCommandPacketFactoryTest {
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(serverPreparedStatementRegistry);
         SQLStatementContext<SelectStatement> sqlStatementContext = 
mock(SQLStatementContext.class);
         when(sqlStatementContext.getSqlStatement()).thenReturn(new 
MySQLSelectStatement());
-        serverPreparedStatementRegistry.addPreparedStatement(1, new 
MySQLServerPreparedStatement("select 1", sqlStatementContext));
+        serverPreparedStatementRegistry.addPreparedStatement(1, new 
MySQLServerPreparedStatement("select 1", sqlStatementContext, 
Collections.emptyList()));
         
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE,
 payload, connectionSession, false), 
instanceOf(MySQLComStmtExecutePacket.class));
     }
     
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/admin/MySQLComResetConnectionExecutorTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/admin/MySQLComResetConnectionExecutorTest.java
index f785b8c0132..0405b0d616f 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/admin/MySQLComResetConnectionExecutorTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/admin/MySQLComResetConnectionExecutorTest.java
@@ -31,6 +31,7 @@ import org.mockito.MockedConstruction;
 
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -51,7 +52,7 @@ public final class MySQLComResetConnectionExecutorTest {
         when(connectionSession.getTransactionStatus()).thenReturn(new 
TransactionStatus(TransactionType.LOCAL));
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new 
ServerPreparedStatementRegistry());
         int statementId = 1;
-        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(statementId,
 new MySQLServerPreparedStatement("", null));
+        
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(statementId,
 new MySQLServerPreparedStatement("", null, Collections.emptyList()));
         Collection<DatabasePacket<?>> actual;
         try (MockedConstruction<BackendTransactionManager> ignored = 
mockConstruction(BackendTransactionManager.class)) {
             actual = new 
MySQLComResetConnectionExecutor(connectionSession).execute();
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
index 81e9be5485b..cb5467dede6 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
@@ -44,7 +44,7 @@ public final class MySQLComStmtSendLongDataExecutorTest {
         when(packet.getData()).thenReturn(data);
         ConnectionSession connectionSession = mock(ConnectionSession.class);
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new 
ServerPreparedStatementRegistry());
-        MySQLServerPreparedStatement preparedStatement = new 
MySQLServerPreparedStatement("insert into t (b) values (?)", 
mock(SQLStatementContext.class));
+        MySQLServerPreparedStatement preparedStatement = new 
MySQLServerPreparedStatement("insert into t (b) values (?)", 
mock(SQLStatementContext.class), Collections.emptyList());
         
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(1, 
preparedStatement);
         MySQLComStmtSendLongDataExecutor executor = new 
MySQLComStmtSendLongDataExecutor(packet, connectionSession);
         Collection<DatabasePacket<?>> actual = executor.execute();
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index c4a56c274ca..4c5588438ca 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -120,13 +120,13 @@ public final class MySQLComStmtExecuteExecutorTest 
extends ProxyContextRestorer
         
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         SQLStatementContext<?> selectStatementContext = 
prepareSelectStatementContext();
         
when(connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(1))
-                .thenReturn(new MySQLServerPreparedStatement("select * from 
tbl where id = ?", selectStatementContext));
+                .thenReturn(new MySQLServerPreparedStatement("select * from 
tbl where id = ?", selectStatementContext, Collections.emptyList()));
         UpdateStatementContext updateStatementContext = 
mock(UpdateStatementContext.class, RETURNS_DEEP_STUBS);
         
when(updateStatementContext.getSqlStatement()).thenReturn(prepareUpdateStatement());
         
when(connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(2))
-                .thenReturn(new MySQLServerPreparedStatement("update tbl set 
col=1 where id = ?", updateStatementContext));
+                .thenReturn(new MySQLServerPreparedStatement("update tbl set 
col=1 where id = ?", updateStatementContext, Collections.emptyList()));
         
when(connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(3))
-                .thenReturn(new MySQLServerPreparedStatement("commit", new 
CommonSQLStatementContext<>(new MySQLCommitStatement())));
+                .thenReturn(new MySQLServerPreparedStatement("commit", new 
CommonSQLStatementContext<>(new MySQLCommitStatement()), 
Collections.emptyList()));
     }
     
     private ShardingSphereDatabase mockDatabase() {
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
index 863267bc019..e39cc3d6e57 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
@@ -29,9 +29,12 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import 
org.apache.shardingsphere.dialect.mysql.exception.UnsupportedPreparedStatementException;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
@@ -47,6 +50,7 @@ import 
org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLServerPreparedStatement;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.sql.parser.api.CacheOption;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement;
 import org.junit.Before;
@@ -58,6 +62,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import java.nio.charset.StandardCharsets;
 import java.sql.Types;
+import java.util.Collections;
 import java.util.Iterator;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -81,6 +86,7 @@ public final class MySQLComStmtPrepareExecutorTest extends 
ProxyContextRestorer
     public void setup() {
         ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
         prepareSQLParser();
+        prepareMetaData();
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new 
ServerPreparedStatementRegistry());
         
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
     }
@@ -95,6 +101,18 @@ public final class MySQLComStmtPrepareExecutorTest extends 
ProxyContextRestorer
         
when(metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getProtocolType()).thenReturn(new
 MySQLDatabaseType());
     }
     
+    private static void prepareMetaData() {
+        ShardingSphereTable table = new ShardingSphereTable();
+        table.getColumns().put("id", new ShardingSphereColumn("id", 
Types.BIGINT, true, false, false, false, true));
+        table.getColumns().put("name", new ShardingSphereColumn("name", 
Types.VARCHAR, false, false, false, false, false));
+        table.getColumns().put("age", new ShardingSphereColumn("age", 
Types.SMALLINT, false, false, false, false, true));
+        ShardingSphereSchema schema = new ShardingSphereSchema();
+        schema.getTables().put("user", table);
+        ShardingSphereDatabase database = new ShardingSphereDatabase("db", new 
MySQLDatabaseType(), new ShardingSphereResourceMetaData("db", 
Collections.emptyMap()),
+                new ShardingSphereRuleMetaData(Collections.emptyList()), 
Collections.singletonMap("db", schema));
+        
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
+    }
+    
     @Test(expected = UnsupportedPreparedStatementException.class)
     public void assertPrepareMultiStatements() {
         when(packet.getSql()).thenReturn("update t set v=v+1 where id=1;update 
t set v=v+1 where id=2;update t set v=v+1 where id=3");
@@ -130,7 +148,6 @@ public final class MySQLComStmtPrepareExecutorTest extends 
ProxyContextRestorer
         int connectionId = 2;
         when(connectionSession.getConnectionId()).thenReturn(connectionId);
         
MySQLStatementIDGenerator.getInstance().registerConnection(connectionId);
-        prepareTable();
         Iterator<DatabasePacket<?>> actualIterator = new 
MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
         assertThat(actualIterator.next(), 
instanceOf(MySQLComStmtPrepareOKPacket.class));
         assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
@@ -153,14 +170,34 @@ public final class MySQLComStmtPrepareExecutorTest 
extends ProxyContextRestorer
         
MySQLStatementIDGenerator.getInstance().unregisterConnection(connectionId);
     }
     
-    private static void prepareTable() {
-        ShardingSphereTable table = new ShardingSphereTable();
-        table.getColumns().put("id", new ShardingSphereColumn("id", 
Types.BIGINT, true, false, false, false, true));
-        table.getColumns().put("name", new ShardingSphereColumn("name", 
Types.VARCHAR, false, false, false, false, false));
-        table.getColumns().put("age", new ShardingSphereColumn("age", 
Types.SMALLINT, false, false, false, false, true));
-        ShardingSphereSchema schema = new ShardingSphereSchema();
-        schema.getTables().put("user", table);
-        
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).thenReturn(schema);
+    @Test
+    public void assertPrepareInsertStatement() {
+        String sql = "insert into user (id, name, age) values (1, ?, ?), (?, 
'bar', ?)";
+        when(packet.getSql()).thenReturn(sql);
+        int connectionId = 2;
+        when(connectionSession.getConnectionId()).thenReturn(connectionId);
+        when(connectionSession.getDefaultDatabaseName()).thenReturn("db");
+        
MySQLStatementIDGenerator.getInstance().registerConnection(connectionId);
+        Iterator<DatabasePacket<?>> actualIterator = new 
MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+        assertThat(actualIterator.next(), 
instanceOf(MySQLComStmtPrepareOKPacket.class));
+        assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
+        DatabasePacket<?> firstAgeColumnDefinitionPacket = 
actualIterator.next();
+        assertThat(firstAgeColumnDefinitionPacket, 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(getColumnDefinitionFlag((MySQLColumnDefinition41Packet) 
firstAgeColumnDefinitionPacket), 
is(MySQLColumnDefinitionFlag.UNSIGNED.getValue()));
+        DatabasePacket<?> idColumnDefinitionPacket = actualIterator.next();
+        assertThat(idColumnDefinitionPacket, 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(getColumnDefinitionFlag((MySQLColumnDefinition41Packet) 
idColumnDefinitionPacket),
+                is(MySQLColumnDefinitionFlag.PRIMARY_KEY.getValue() | 
MySQLColumnDefinitionFlag.UNSIGNED.getValue()));
+        DatabasePacket<?> secondAgeColumnDefinitionPacket = 
actualIterator.next();
+        assertThat(secondAgeColumnDefinitionPacket, 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(getColumnDefinitionFlag((MySQLColumnDefinition41Packet) 
secondAgeColumnDefinitionPacket), 
is(MySQLColumnDefinitionFlag.UNSIGNED.getValue()));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertFalse(actualIterator.hasNext());
+        MySQLServerPreparedStatement actualPreparedStatement = 
connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(1);
+        assertThat(actualPreparedStatement.getSql(), is(sql));
+        assertThat(actualPreparedStatement.getSqlStatementContext(), 
instanceOf(InsertStatementContext.class));
+        
assertThat(actualPreparedStatement.getSqlStatementContext().getSqlStatement(), 
instanceOf(MySQLInsertStatement.class));
+        
MySQLStatementIDGenerator.getInstance().unregisterConnection(connectionId);
     }
     
     private int getColumnDefinitionFlag(final MySQLColumnDefinition41Packet 
packet) {
@@ -171,13 +208,16 @@ public final class MySQLComStmtPrepareExecutorTest 
extends ProxyContextRestorer
     
     @Test
     public void assertPrepareUpdateStatement() {
-        String sql = "update t set v = ?";
+        String sql = "update user set name = ?, age = ? where id = ?";
         when(packet.getSql()).thenReturn(sql);
         when(connectionSession.getConnectionId()).thenReturn(1);
+        when(connectionSession.getDefaultDatabaseName()).thenReturn("db");
         MySQLStatementIDGenerator.getInstance().registerConnection(1);
         Iterator<DatabasePacket<?>> actualIterator = new 
MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
         assertThat(actualIterator.next(), 
instanceOf(MySQLComStmtPrepareOKPacket.class));
         assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), 
instanceOf(MySQLColumnDefinition41Packet.class));
         assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
         assertFalse(actualIterator.hasNext());
         MySQLServerPreparedStatement actualPreparedStatement = 
connectionSession.getServerPreparedStatementRegistry().getPreparedStatement(1);
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareParameterMarkerExtractorTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareParameterMarkerExtractorTest.java
new file mode 100644
index 00000000000..18a295e918a
--- /dev/null
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareParameterMarkerExtractorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prepare;
+
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import org.apache.shardingsphere.sql.parser.api.CacheOption;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.ParameterMarkerSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.AbstractSQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.junit.Test;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class MySQLComStmtPrepareParameterMarkerExtractorTest {
+    
+    @Test
+    public void assertFindColumnsOfParameterMarkersForInsertStatement() {
+        String sql = "insert into user (id, name, age) values (1, ?, ?), (?, 
'bar', ?)";
+        SQLStatement sqlStatement = new ShardingSphereSQLParserEngine("MySQL", 
new CacheOption(0, 0), new CacheOption(0, 0), false).parse(sql, false);
+        ShardingSphereSchema schema = prepareSchema();
+        Map<ParameterMarkerSegment, ShardingSphereColumn> actual = 
MySQLComStmtPrepareParameterMarkerExtractor.findColumnsOfParameterMarkers(sqlStatement,
 schema);
+        List<ParameterMarkerSegment> parameterMarkerSegments = new 
ArrayList<>(((AbstractSQLStatement) sqlStatement).getParameterMarkerSegments());
+        assertThat(actual.get(parameterMarkerSegments.get(0)), 
is(schema.getTable("user").getColumns().get("name")));
+        assertThat(actual.get(parameterMarkerSegments.get(1)), 
is(schema.getTable("user").getColumns().get("age")));
+        assertThat(actual.get(parameterMarkerSegments.get(2)), 
is(schema.getTable("user").getColumns().get("id")));
+        assertThat(actual.get(parameterMarkerSegments.get(3)), 
is(schema.getTable("user").getColumns().get("age")));
+    }
+    
+    private ShardingSphereSchema prepareSchema() {
+        ShardingSphereTable table = new ShardingSphereTable();
+        table.getColumns().put("id", new ShardingSphereColumn("id", 
Types.BIGINT, true, false, false, false, true));
+        table.getColumns().put("name", new ShardingSphereColumn("name", 
Types.VARCHAR, false, false, false, false, false));
+        table.getColumns().put("age", new ShardingSphereColumn("age", 
Types.SMALLINT, false, false, false, false, true));
+        ShardingSphereSchema result = new ShardingSphereSchema();
+        result.getTables().put("user", table);
+        return result;
+    }
+}
diff --git 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
index 3a8d8d8da6a..d77e72114b4 100644
--- 
a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
+++ 
b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.transaction.api.TransactionType;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -44,7 +45,7 @@ public final class MySQLComStmtResetExecutorTest {
         ConnectionSession connectionSession = mock(ConnectionSession.class);
         
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new 
ServerPreparedStatementRegistry());
         when(connectionSession.getTransactionStatus()).thenReturn(new 
TransactionStatus(TransactionType.LOCAL));
-        MySQLServerPreparedStatement preparedStatement = new 
MySQLServerPreparedStatement("", mock(SQLStatementContext.class));
+        MySQLServerPreparedStatement preparedStatement = new 
MySQLServerPreparedStatement("", mock(SQLStatementContext.class), 
Collections.emptyList());
         preparedStatement.getLongData().put(0, new byte[0]);
         
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(1, 
preparedStatement);
         MySQLComStmtResetPacket packet = mock(MySQLComStmtResetPacket.class);
diff --git 
a/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_min_values_into_single_table_integer.xml
 
b/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_min_values_into_single_table_integer.xml
deleted file mode 100644
index d2fc5704b32..00000000000
--- 
a/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_min_values_into_single_table_integer.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<dataset update-count="1">
-    <metadata data-nodes="passthrough.t_data_type_integer">
-        <column name="id" type="numeric" />
-        <column name="col_bigint" type="numeric" />
-        <column name="col_int" type="numeric" />
-        <column name="col_mediumint" type="numeric" />
-        <column name="col_smallint" type="numeric" />
-        <column name="col_tinyint" type="numeric" />
-    </metadata>
-    <row data-node="passthrough.t_data_type_integer" values="2, 
-9223372036854775808, -2147483648, -8388608, 0, -128" />
-</dataset>
diff --git 
a/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_negative_smallint_into_single_table_integer.xml
 
b/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_negative_smallint_into_single_table_integer.xml
deleted file mode 100644
index 682cefa3625..00000000000
--- 
a/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_negative_smallint_into_single_table_integer.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<dataset update-count="1">
-    <metadata data-nodes="passthrough.t_data_type_integer">
-        <column name="id" type="numeric" />
-        <column name="col_bigint" type="numeric" />
-        <column name="col_int" type="numeric" />
-        <column name="col_mediumint" type="numeric" />
-        <column name="col_smallint" type="numeric" />
-        <column name="col_tinyint" type="numeric" />
-    </metadata>
-    <row data-node="passthrough.t_data_type_integer" values="4, 0, 0, 0, -1, 
0" />
-</dataset>
diff --git 
a/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_negative_values_into_single_table_integer.xml
 
b/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_negative_values_into_single_table_integer.xml
deleted file mode 100644
index 8115164f13e..00000000000
--- 
a/test/e2e/suite/src/test/resources/cases/dml/dataset/passthrough/mysql/insert_negative_values_into_single_table_integer.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<dataset update-count="1">
-    <metadata data-nodes="passthrough.t_data_type_integer">
-        <column name="id" type="numeric" />
-        <column name="col_bigint" type="numeric" />
-        <column name="col_int" type="numeric" />
-        <column name="col_mediumint" type="numeric" />
-        <column name="col_smallint" type="numeric" />
-        <column name="col_tinyint" type="numeric" />
-    </metadata>
-    <row data-node="passthrough.t_data_type_integer" values="3, -1, -1, -1, 0, 
-1" />
-</dataset>
diff --git 
a/test/e2e/suite/src/test/resources/cases/dml/dml-integration-test-cases.xml 
b/test/e2e/suite/src/test/resources/cases/dml/dml-integration-test-cases.xml
index 1b6ce1e871a..4c232286a79 100644
--- a/test/e2e/suite/src/test/resources/cases/dml/dml-integration-test-cases.xml
+++ b/test/e2e/suite/src/test/resources/cases/dml/dml-integration-test-cases.xml
@@ -290,21 +290,13 @@
         <assertion parameters="0:int, 4:int, 5:int" 
expected-data-file="shadow_delete_order_by_user_id.xml" />
     </test-case>
     
-    <test-case sql="INSERT INTO t_data_type_integer (id, col_bigint, col_int, 
col_mediumint, col_smallint, col_tinyint) values (?, ?, ?, ?, ?, ?)" 
db-types="MySQL" scenario-types="passthrough">
-        <assertion parameters="1:int, 9223372036854775807:long, 
2147483647:int, 8388607:int, 32767:smallint, 127:tinyint" 
expected-data-file="insert_max_values_into_single_table_integer.xml" />
-        <!-- TODO Support negative value of smallint & tinyint. 
https://github.com/apache/shardingsphere/issues/21902 -->
-        <assertion parameters="2:int, -9223372036854775808:long, 
-2147483648:int, -8388608:int, 0:smallint, -128:tinyint" 
expected-data-file="insert_min_values_into_single_table_integer.xml" />
-        <assertion parameters="3:int, -1:long, -1:int, -1:int, 0:smallint, 
-1:tinyint" 
expected-data-file="insert_negative_values_into_single_table_integer.xml" />
-        <!-- <assertion parameters="4:int, 0:int, 0:int, 0:int, -1:smallint, 
0:int" 
expected-data-file="insert_negative_smallint_into_single_table_integer.xml" 
/>-->
-    </test-case>
-    
     <test-case sql="INSERT INTO t_data_type_integer_unsigned (id, 
col_bigint_unsigned, col_int_unsigned, col_mediumint_unsigned, 
col_smallint_unsigned, col_tinyint_unsigned) values (?, ?, ?, ?, ?, ?)" 
db-types="MySQL" scenario-types="passthrough">
         <!-- TODO Test unsigned with MySQL Connector/J 8.0.x client 
https://github.com/apache/shardingsphere/issues/14349 -->
         <assertion parameters="1:int, 18446744073709551615:decimal, 
4294967295:long, 16777215:int, 65535:int, 255:smallint" 
expected-data-file="insert_max_values_into_single_table_integer_unsigned.xml" />
         <assertion parameters="2:int, 0:long, 0:int, 0:int, 0:smallint, 
0:tinyint" 
expected-data-file="insert_min_values_into_single_table_integer_unsigned.xml" />
     </test-case>
     
-    <test-case sql="INSERT INTO t_data_type_integer (id, col_bigint, col_int, 
col_mediumint, col_smallint, col_tinyint) values (?, ?, ?, ?, ?, ?)" 
db-types="PostgreSQL,openGauss" scenario-types="passthrough">
+    <test-case sql="INSERT INTO t_data_type_integer (id, col_bigint, col_int, 
col_mediumint, col_smallint, col_tinyint) values (?, ?, ?, ?, ?, ?)" 
db-types="MySQL,PostgreSQL,openGauss" scenario-types="passthrough">
         <assertion parameters="1:int, 9223372036854775807:long, 
2147483647:int, 8388607:int, 32767:smallint, 127:tinyint" 
expected-data-file="insert_max_values_into_single_table_integer.xml" />
         <assertion parameters="2:int, -9223372036854775808:long, 
-2147483648:int, -8388608:int, -32768:smallint, -128:tinyint" 
expected-data-file="insert_min_values_into_single_table_integer.xml" />
         <assertion parameters="3:int, -1:long, -1:int, -1:int, -1:smallint, 
-1:tinyint" 
expected-data-file="insert_negative_values_into_single_table_integer.xml" />


Reply via email to