This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2627271  Isolate stateful prepared statements between MySQL 
connections (#13652)
2627271 is described below

commit 2627271695cdcdcf9cd3648afc35615860bf5bb0
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Nov 17 14:15:51 2021 +0800

    Isolate stateful prepared statements between MySQL connections (#13652)
    
    * Isolate stateful prepared statements between MySQL connections
    
    * Rename binary statement to prepared statement
    
    * Update javadoc
---
 .../packet/command/MySQLCommandPacketFactory.java  |   5 +-
 .../query/binary/MySQLBinaryStatementRegistry.java |  89 ----------------
 ...yStatement.java => MySQLPreparedStatement.java} |   4 +-
 ...va => MySQLPreparedStatementParameterType.java} |   2 +-
 .../binary/MySQLPreparedStatementRegistry.java     | 113 +++++++++++++++++++++
 .../binary/close/MySQLComStmtClosePacket.java      |   8 --
 .../binary/execute/MySQLComStmtExecutePacket.java  |  30 +++---
 .../MySQLMySQLCommandPacketFactoryTest.java        |  72 ++++++-------
 .../binary/MySQLBinaryStatementRegistryTest.java   |  63 ------------
 .../binary/MySQLPreparedStatementRegistryTest.java |  71 +++++++++++++
 .../binary/close/MySQLComStmtClosePacketTest.java  |   7 --
 .../execute/MySQLComStmtExecutePacketTest.java     |  21 ++--
 .../fixture/BinaryStatementRegistryUtil.java       |  47 ---------
 .../proxy/frontend/mysql/MySQLFrontendEngine.java  |   2 +
 .../authentication/MySQLAuthenticationEngine.java  |   2 +
 .../mysql/command/MySQLCommandExecuteEngine.java   |   2 +-
 .../mysql/command/MySQLCommandExecutorFactory.java |   2 +-
 .../binary/close/MySQLComStmtCloseExecutor.java    |   6 +-
 .../prepare/MySQLComStmtPrepareExecutor.java       |   6 +-
 19 files changed, 264 insertions(+), 288 deletions(-)

diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLCommandPacketFactory.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLCommandPacketFactory.java
index bd70348..0f7d24c 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLCommandPacketFactory.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLCommandPacketFactory.java
@@ -44,10 +44,11 @@ public final class MySQLCommandPacketFactory {
      *
      * @param commandPacketType command packet type for MySQL
      * @param payload packet payload for MySQL
+     * @param connectionId connection ID
      * @return command packet for MySQL
      * @throws SQLException SQL exception
      */
-    public static MySQLCommandPacket newInstance(final MySQLCommandPacketType 
commandPacketType, final MySQLPacketPayload payload) throws SQLException {
+    public static MySQLCommandPacket newInstance(final MySQLCommandPacketType 
commandPacketType, final MySQLPacketPayload payload, final int connectionId) 
throws SQLException {
         switch (commandPacketType) {
             case COM_QUIT:
                 return new MySQLComQuitPacket();
@@ -60,7 +61,7 @@ public final class MySQLCommandPacketFactory {
             case COM_STMT_PREPARE:
                 return new MySQLComStmtPreparePacket(payload);
             case COM_STMT_EXECUTE:
-                return new MySQLComStmtExecutePacket(payload);
+                return new MySQLComStmtExecutePacket(payload, connectionId);
             case COM_STMT_RESET:
                 return new MySQLComStmtResetPacket(payload);
             case COM_STMT_CLOSE:
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementRegistry.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementRegistry.java
deleted file mode 100644
index a234953..0000000
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementRegistry.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * MySQL binary prepared statement registry.
- */
-@NoArgsConstructor(access = AccessLevel.NONE)
-public final class MySQLBinaryStatementRegistry {
-    
-    private static final MySQLBinaryStatementRegistry INSTANCE = new 
MySQLBinaryStatementRegistry();
-    
-    private final ConcurrentMap<String, Integer> statementIdAssigner = new 
ConcurrentHashMap<>(65535, 1);
-    
-    private final ConcurrentMap<Integer, MySQLBinaryStatement> 
binaryStatements = new ConcurrentHashMap<>(65535, 1);
-    
-    private final AtomicInteger sequence = new AtomicInteger();
-    
-    /**
-     * Get prepared statement registry instance.
-     *
-     * @return prepared statement registry instance
-     */
-    public static MySQLBinaryStatementRegistry getInstance() {
-        return INSTANCE;
-    }
-    
-    /**
-     * Register.
-     *
-     * @param sql SQL
-     * @param parameterCount parameter count
-     * @return statement ID
-     */
-    public synchronized int register(final String sql, final int 
parameterCount) {
-        Integer result = statementIdAssigner.get(sql);
-        if (null != result) {
-            return result;
-        }
-        result = sequence.incrementAndGet();
-        statementIdAssigner.putIfAbsent(sql, result);
-        binaryStatements.putIfAbsent(result, new MySQLBinaryStatement(sql, 
parameterCount));
-        return result;
-    }
-    
-    /**
-     * Get binary statement.
-     *
-     * @param statementId statement ID
-     * @return binary prepared statement
-     */
-    public MySQLBinaryStatement get(final int statementId) {
-        return binaryStatements.get(statementId);
-    }
-    
-    /**
-     * Unregister.
-     *
-     * @param statementId statement ID
-     */
-    public synchronized void unregister(final int statementId) {
-        if (binaryStatements.containsKey(statementId)) {
-            
statementIdAssigner.remove(binaryStatements.get(statementId).getSql());
-            binaryStatements.remove(statementId);
-        }
-    }
-}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatement.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
similarity index 91%
rename from 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatement.java
rename to 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
index e459412..1a03fd3 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatement.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
@@ -29,11 +29,11 @@ import java.util.List;
 @RequiredArgsConstructor
 @Getter
 @Setter
-public final class MySQLBinaryStatement {
+public final class MySQLPreparedStatement {
     
     private final String sql;
     
     private final int parameterCount;
     
-    private List<MySQLBinaryStatementParameterType> parameterTypes;
+    private List<MySQLPreparedStatementParameterType> parameterTypes;
 }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementParameterType.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementParameterType.java
similarity index 95%
rename from 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementParameterType.java
rename to 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementParameterType.java
index 03ff4dc..eab2400 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementParameterType.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementParameterType.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
  */
 @RequiredArgsConstructor
 @Getter
-public final class MySQLBinaryStatementParameterType {
+public final class MySQLPreparedStatementParameterType {
     
     private final MySQLBinaryColumnType columnType;
     
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
new file mode 100644
index 0000000..e2eb11d
--- /dev/null
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * MySQL prepared statement registry.
+ */
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class MySQLPreparedStatementRegistry {
+    
+    private static final MySQLPreparedStatementRegistry INSTANCE = new 
MySQLPreparedStatementRegistry();
+    
+    private final ConcurrentMap<Integer, MySQLConnectionPreparedStatements> 
connectionRegistry = new ConcurrentHashMap<>(8192, 1);
+    
+    /**
+     * Get prepared statement registry instance.
+     *
+     * @return prepared statement registry instance
+     */
+    public static MySQLPreparedStatementRegistry getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register connection.
+     *
+     * @param connectionId connection ID
+     */
+    public void registerConnection(final int connectionId) {
+        connectionRegistry.put(connectionId, new 
MySQLConnectionPreparedStatements());
+    }
+    
+    /**
+     * Get connection prepared statements.
+     * 
+     * @param connectionId connection ID
+     * @return MySQL connection prepared statements
+     */
+    public MySQLConnectionPreparedStatements 
getConnectionPreparedStatements(final int connectionId) {
+        return connectionRegistry.get(connectionId);
+    }
+    
+    /**
+     * Unregister connection.
+     *
+     * @param connectionId connection ID
+     */
+    public void unregisterConnection(final int connectionId) {
+        connectionRegistry.remove(connectionId);
+    }
+    
+    public static class MySQLConnectionPreparedStatements {
+        
+        private final Map<Integer, MySQLPreparedStatement> preparedStatements 
= new ConcurrentHashMap<>(16384, 1);
+        
+        private final AtomicInteger sequence = new AtomicInteger();
+        
+        /**
+         * Prepare statement.
+         *
+         * @param sql SQL
+         * @param parameterCount parameter count
+         * @return statement ID
+         */
+        public int prepareStatement(final String sql, final int 
parameterCount) {
+            int result = sequence.incrementAndGet();
+            preparedStatements.put(result, new MySQLPreparedStatement(sql, 
parameterCount));
+            return result;
+        }
+        
+        /**
+         * Get prepared statement.
+         *
+         * @param statementId statement ID
+         * @return prepared statement
+         */
+        public MySQLPreparedStatement get(final int statementId) {
+            return preparedStatements.get(statementId);
+        }
+        
+        /**
+         * Close statement.
+         *
+         * @param statementId statement ID
+         */
+        public void closeStatement(final int statementId) {
+            preparedStatements.remove(statementId);
+        }
+    }
+}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacket.java
index 1af5d46..c32b9d2 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacket.java
@@ -21,7 +21,6 @@ import lombok.Getter;
 import lombok.ToString;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 /**
@@ -39,11 +38,4 @@ public final class MySQLComStmtClosePacket extends 
MySQLCommandPacket {
         super(MySQLCommandPacketType.COM_STMT_CLOSE);
         statementId = payload.readInt4();
     }
-    
-    /**
-     * Remove cached statement.
-     */
-    public void removeCachedStatement() {
-        MySQLBinaryStatementRegistry.getInstance().unregister(statementId);
-    }
 }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
index 35fb9a6..4e0b399 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
@@ -24,9 +24,9 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatement;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementParameterType;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol.MySQLBinaryProtocolValue;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.protocol.MySQLBinaryProtocolValueFactory;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
@@ -50,7 +50,7 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
     
     private final int statementId;
     
-    private final MySQLBinaryStatement binaryStatement;
+    private final MySQLPreparedStatement preparedStatement;
     
     private final int flags;
     
@@ -64,14 +64,14 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
     @Getter
     private final List<Object> parameters;
     
-    public MySQLComStmtExecutePacket(final MySQLPacketPayload payload) throws 
SQLException {
+    public MySQLComStmtExecutePacket(final MySQLPacketPayload payload, final 
int connectionId) throws SQLException {
         super(MySQLCommandPacketType.COM_STMT_EXECUTE);
         statementId = payload.readInt4();
-        binaryStatement = 
MySQLBinaryStatementRegistry.getInstance().get(statementId);
+        preparedStatement = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionId).get(statementId);
         flags = payload.readInt1();
         Preconditions.checkArgument(ITERATION_COUNT == payload.readInt4());
-        int parameterCount = binaryStatement.getParameterCount();
-        sql = binaryStatement.getSql();
+        int parameterCount = preparedStatement.getParameterCount();
+        sql = preparedStatement.getSql();
         if (parameterCount > 0) {
             nullBitmap = new MySQLNullBitmap(parameterCount, 
NULL_BITMAP_OFFSET);
             for (int i = 0; i < nullBitmap.getNullBitmap().length; i++) {
@@ -79,7 +79,7 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
             }
             newParametersBoundFlag = 
MySQLNewParametersBoundFlag.valueOf(payload.readInt1());
             if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == 
newParametersBoundFlag) {
-                binaryStatement.setParameterTypes(getParameterTypes(payload, 
parameterCount));
+                preparedStatement.setParameterTypes(getParameterTypes(payload, 
parameterCount));
             }
             parameters = getParameters(payload, parameterCount);
         } else {
@@ -89,12 +89,12 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
         }
     }
     
-    private List<MySQLBinaryStatementParameterType> getParameterTypes(final 
MySQLPacketPayload payload, final int parameterCount) {
-        List<MySQLBinaryStatementParameterType> result = new 
ArrayList<>(parameterCount);
+    private List<MySQLPreparedStatementParameterType> getParameterTypes(final 
MySQLPacketPayload payload, final int parameterCount) {
+        List<MySQLPreparedStatementParameterType> result = new 
ArrayList<>(parameterCount);
         for (int parameterIndex = 0; parameterIndex < parameterCount; 
parameterIndex++) {
             MySQLBinaryColumnType columnType = 
MySQLBinaryColumnType.valueOf(payload.readInt1());
             int unsignedFlag = payload.readInt1();
-            result.add(new MySQLBinaryStatementParameterType(columnType, 
unsignedFlag));
+            result.add(new MySQLPreparedStatementParameterType(columnType, 
unsignedFlag));
         }
         return result;
     }
@@ -102,7 +102,7 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
     private List<Object> getParameters(final MySQLPacketPayload payload, final 
int parameterCount) throws SQLException {
         List<Object> result = new ArrayList<>(parameterCount);
         for (int parameterIndex = 0; parameterIndex < parameterCount; 
parameterIndex++) {
-            MySQLBinaryProtocolValue binaryProtocolValue = 
MySQLBinaryProtocolValueFactory.getBinaryProtocolValue(binaryStatement.getParameterTypes().get(parameterIndex).getColumnType());
+            MySQLBinaryProtocolValue binaryProtocolValue = 
MySQLBinaryProtocolValueFactory.getBinaryProtocolValue(preparedStatement.getParameterTypes().get(parameterIndex).getColumnType());
             result.add(nullBitmap.isNullParameter(parameterIndex) ? null : 
binaryProtocolValue.read(payload));
         }
         return result;
@@ -113,14 +113,14 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
         payload.writeInt4(statementId);
         payload.writeInt1(flags);
         payload.writeInt4(ITERATION_COUNT);
-        if (binaryStatement.getParameterCount() > 0) {
+        if (preparedStatement.getParameterCount() > 0) {
             for (int each : nullBitmap.getNullBitmap()) {
                 payload.writeInt1(each);
             }
             payload.writeInt1(newParametersBoundFlag.getValue());
             int count = 0;
             for (Object each : parameters) {
-                MySQLBinaryStatementParameterType parameterType = 
binaryStatement.getParameterTypes().get(count);
+                MySQLPreparedStatementParameterType parameterType = 
preparedStatement.getParameterTypes().get(count);
                 payload.writeInt1(parameterType.getColumnType().getValue());
                 payload.writeInt1(parameterType.getUnsignedFlag());
                 payload.writeStringLenenc(null == each ? "" : each.toString());
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLMySQLCommandPacketFactoryTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLMySQLCommandPacketFactoryTest.java
index 9672ef3..42fdbd2 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLMySQLCommandPacketFactoryTest.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/MySQLMySQLCommandPacketFactoryTest.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -44,170 +44,174 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLMySQLCommandPacketFactoryTest {
     
+    private static final int CONNECTION_ID = 1;
+    
     @Mock
     private MySQLPacketPayload payload;
     
     @Test
     public void assertNewInstanceWithComQuitPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT,
 payload), instanceOf(MySQLComQuitPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT,
 payload, CONNECTION_ID), instanceOf(MySQLComQuitPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComInitDbPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB,
 payload), instanceOf(MySQLComInitDbPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB,
 payload, CONNECTION_ID), instanceOf(MySQLComInitDbPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComFieldListPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST,
 payload), instanceOf(MySQLComFieldListPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST,
 payload, CONNECTION_ID), instanceOf(MySQLComFieldListPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComQueryPacket() throws SQLException {
         when(payload.readStringEOF()).thenReturn("SHOW TABLES");
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY,
 payload), instanceOf(MySQLComQueryPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY,
 payload, CONNECTION_ID), instanceOf(MySQLComQueryPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtPreparePacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE,
 payload), instanceOf(MySQLComStmtPreparePacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtPreparePacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtExecutePacket() throws 
SQLException {
         
when(payload.readInt1()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST.getValue());
         when(payload.readInt4()).thenReturn(1);
-        MySQLBinaryStatementRegistry.getInstance().register("SELECT * FROM 
t_order", 1);
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE,
 payload), instanceOf(MySQLComStmtExecutePacket.class));
+        
MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
+        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement("SELECT
 * FROM t_order", 1);
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtExecutePacket.class));
+        
MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
     }
     
     @Test
     public void assertNewInstanceWithComStmtClosePacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE,
 payload), instanceOf(MySQLComStmtClosePacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtClosePacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComPingPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING,
 payload), instanceOf(MySQLComPingPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING,
 payload, CONNECTION_ID), instanceOf(MySQLComPingPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComSleepPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComCreateDbPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDropDbPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComRefreshPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComShutDownPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStatisticsPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComProcessInfoPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComConnectPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComProcessKillPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDebugPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComTimePacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDelayedInsertPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComChangeUserPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComBinlogDumpPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComTableDumpPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComConnectOutPacket() throws SQLException 
{
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComRegisterSlavePacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtSendLongDataPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtResetPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET,
 payload), instanceOf(MySQLComStmtResetPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET,
 payload, CONNECTION_ID), instanceOf(MySQLComStmtResetPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComSetOptionPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtFetchPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDaemonPacket() throws SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComBinlogDumpGTIDPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComResetConnectionPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION,
 payload), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION,
 payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
 }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementRegistryTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementRegistryTest.java
deleted file mode 100644
index e8aeb78..0000000
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLBinaryStatementRegistryTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.fixture.BinaryStatementRegistryUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-public final class MySQLBinaryStatementRegistryTest {
-    
-    private final String sql = "SELECT * FROM tbl WHERE id=?";
-    
-    @Before
-    @After
-    public void reset() {
-        BinaryStatementRegistryUtil.reset();
-    }
-    
-    @Test
-    public void assertRegisterIfAbsent() {
-        assertThat(MySQLBinaryStatementRegistry.getInstance().register(sql, 
1), is(1));
-        MySQLBinaryStatement actual = 
MySQLBinaryStatementRegistry.getInstance().get(1);
-        assertThat(actual.getSql(), is(sql));
-        assertThat(actual.getParameterCount(), is(1));
-    }
-    
-    @Test
-    public void assertRegisterIfPresent() {
-        assertThat(MySQLBinaryStatementRegistry.getInstance().register(sql, 
1), is(1));
-        assertThat(MySQLBinaryStatementRegistry.getInstance().register(sql, 
1), is(1));
-        MySQLBinaryStatement actual = 
MySQLBinaryStatementRegistry.getInstance().get(1);
-        assertThat(actual.getSql(), is(sql));
-        assertThat(actual.getParameterCount(), is(1));
-    }
-    
-    @Test
-    public void assertUnregisterIfPresent() {
-        MySQLBinaryStatementRegistry.getInstance().register(sql, 1);
-        MySQLBinaryStatementRegistry.getInstance().unregister(1);
-        MySQLBinaryStatement actual = 
MySQLBinaryStatementRegistry.getInstance().get(1);
-        assertNull(actual);
-    }
-}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
new file mode 100644
index 0000000..156461b
--- /dev/null
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class MySQLPreparedStatementRegistryTest {
+    
+    private static final int CONNECTION_ID = 1;
+    
+    private static final String SQL = "SELECT * FROM tbl WHERE id=?";
+    
+    @Before
+    public void setup() {
+        
MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
+    }
+    
+    @Test
+    public void assertRegisterIfAbsent() {
+        
assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 1), is(1));
+        MySQLPreparedStatement actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
+        assertThat(actual.getSql(), is(SQL));
+        assertThat(actual.getParameterCount(), is(1));
+    }
+    
+    @Test
+    public void assertPrepareSameSQL() {
+        
assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 1), is(1));
+        
assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 1), is(2));
+        MySQLPreparedStatement actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
+        assertThat(actual.getSql(), is(SQL));
+        assertThat(actual.getParameterCount(), is(1));
+        actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
+        assertThat(actual.getSql(), is(SQL));
+        assertThat(actual.getParameterCount(), is(1));
+    }
+    
+    @Test
+    public void assertCloseStatement() {
+        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL,
 1);
+        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).closeStatement(1);
+        MySQLPreparedStatement actual = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
+        assertNull(actual);
+    }
+    
+    @After
+    public void tearDown() {
+        
MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
+    }
+}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacketTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacketTest.java
index 8f77bb4..12ed368 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacketTest.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/close/MySQLComStmtClosePacketTest.java
@@ -46,11 +46,4 @@ public final class MySQLComStmtClosePacketTest {
         MySQLComStmtClosePacket actual = new MySQLComStmtClosePacket(payload);
         actual.write(payload);
     }
-    
-    @Test
-    public void assertRemoveCachedStatement() {
-        when(payload.readInt4()).thenReturn(1);
-        MySQLComStmtClosePacket actual = new MySQLComStmtClosePacket(payload);
-        actual.removeCachedStatement();
-    }
 }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index df2a079..9cba1bb 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -17,10 +17,8 @@
 
 package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute;
 
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.fixture.BinaryStatementRegistryUtil;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,21 +37,22 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLComStmtExecutePacketTest {
     
+    private static final int CONNECTION_ID = 1;
+    
     @Mock
     private MySQLPacketPayload payload;
     
     @Before
-    @After
-    public void reset() {
-        BinaryStatementRegistryUtil.reset();
+    public void setup() {
+        
MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
+        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement("SELECT
 id FROM tbl WHERE id=?", 1);
     }
     
     @Test
     public void assertNewWithNotNullParameters() throws SQLException {
-        MySQLBinaryStatementRegistry.getInstance().register("SELECT id FROM 
tbl WHERE id=?", 1);
         when(payload.readInt4()).thenReturn(1);
         when(payload.readInt1()).thenReturn(0, 0, 1);
-        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload);
+        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload, CONNECTION_ID);
         assertThat(actual.getSequenceId(), is(0));
         assertThat(actual.getSql(), is("SELECT id FROM tbl WHERE id=?"));
         assertThat(actual.getParameters(), 
is(Collections.<Object>singletonList(1)));
@@ -61,10 +60,9 @@ public final class MySQLComStmtExecutePacketTest {
     
     @Test
     public void assertNewWithNullParameters() throws SQLException {
-        MySQLBinaryStatementRegistry.getInstance().register("SELECT id FROM 
tbl WHERE id=?", 1);
         when(payload.readInt4()).thenReturn(1);
         when(payload.readInt1()).thenReturn(0, 1);
-        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload);
+        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload, CONNECTION_ID);
         assertThat(actual.getSequenceId(), is(0));
         assertThat(actual.getSql(), is("SELECT id FROM tbl WHERE id=?"));
         assertThat(actual.getParameters(), 
is(Collections.singletonList(null)));
@@ -72,10 +70,9 @@ public final class MySQLComStmtExecutePacketTest {
     
     @Test
     public void assertWrite() throws SQLException {
-        MySQLBinaryStatementRegistry.getInstance().register("SELECT id FROM 
tbl WHERE id=?", 1);
         when(payload.readInt4()).thenReturn(1);
         when(payload.readInt1()).thenReturn(0, 1);
-        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload);
+        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload, CONNECTION_ID);
         actual.write(payload);
         verify(payload, times(2)).writeInt4(1);
         verify(payload, times(4)).writeInt1(1);
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/fixture/BinaryStatementRegistryUtil.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/fixture/BinaryStatementRegistryUtil.java
deleted file mode 100644
index dbdb097..0000000
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/fixture/BinaryStatementRegistryUtil.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.fixture;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class BinaryStatementRegistryUtil {
-    
-    /**
-     * Reset {@code MySQLBinaryStatementRegistry}.
-     */
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static void reset() {
-        Field statementIdAssignerField = 
MySQLBinaryStatementRegistry.class.getDeclaredField("statementIdAssigner");
-        statementIdAssignerField.setAccessible(true);
-        ((Map) 
statementIdAssignerField.get(MySQLBinaryStatementRegistry.getInstance())).clear();
-        Field binaryStatementsField = 
MySQLBinaryStatementRegistry.class.getDeclaredField("binaryStatements");
-        binaryStatementsField.setAccessible(true);
-        ((Map) 
binaryStatementsField.get(MySQLBinaryStatementRegistry.getInstance())).clear();
-        Field sequenceField = 
MySQLBinaryStatementRegistry.class.getDeclaredField("sequence");
-        sequenceField.setAccessible(true);
-        ((AtomicInteger) 
sequenceField.get(MySQLBinaryStatementRegistry.getInstance())).set(0);
-    }
-}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index b62525c..ddb2098 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import 
org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import 
org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
 import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
@@ -45,6 +46,7 @@ public final class MySQLFrontendEngine implements 
DatabaseProtocolFrontendEngine
     
     @Override
     public void release(final BackendConnection backendConnection) {
+        
MySQLPreparedStatementRegistry.getInstance().unregisterConnection(backendConnection.getConnectionId());
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index baad9bc..b57cdd1 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerErrorCode;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -64,6 +65,7 @@ public final class MySQLAuthenticationEngine implements 
AuthenticationEngine {
         int result = ConnectionIdGenerator.getInstance().nextId();
         connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
         context.writeAndFlush(new MySQLHandshakePacket(result, 
authenticationHandler.getAuthPluginData()));
+        
MySQLPreparedStatementRegistry.getInstance().registerConnection(result);
         return result;
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 6ee3edd..69a0555 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -52,7 +52,7 @@ public final class MySQLCommandExecuteEngine implements 
CommandExecuteEngine {
     
     @Override
     public MySQLCommandPacket getCommandPacket(final PacketPayload payload, 
final CommandPacketType type, final BackendConnection backendConnection) throws 
SQLException {
-        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) 
type, (MySQLPacketPayload) payload);
+        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) 
type, (MySQLPacketPayload) payload, backendConnection.getConnectionId());
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
index 0ea7afd..aebadb9 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
@@ -80,7 +80,7 @@ public final class MySQLCommandExecutorFactory {
             case COM_STMT_RESET:
                 return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket) 
commandPacket);
             case COM_STMT_CLOSE:
-                return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) 
commandPacket);
+                return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) 
commandPacket, backendConnection.getConnectionId());
             default:
                 return new MySQLUnsupportedCommandExecutor(commandPacketType);
         }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
index 288271f..0605924 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
 
 import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
@@ -33,10 +34,11 @@ public final class MySQLComStmtCloseExecutor implements 
CommandExecutor {
     
     private final MySQLComStmtClosePacket packet;
     
+    private final int connectionId;
+    
     @Override
     public Collection<DatabasePacket<?>> execute() {
-        //TODO we need to design the cache in future.
-//        packet.removeCachedStatement();
+        
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionId).closeStatement(packet.getStatementId());
         return Collections.emptyList();
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
index 95cd7aa..25b00a5 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prep
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLBinaryStatementRegistry;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
@@ -47,8 +47,6 @@ import java.util.Map;
  */
 public final class MySQLComStmtPrepareExecutor implements CommandExecutor {
     
-    private static final MySQLBinaryStatementRegistry 
PREPARED_STATEMENT_REGISTRY = MySQLBinaryStatementRegistry.getInstance();
-    
     private final MySQLComStmtPreparePacket packet;
     
     private final BackendConnection backendConnection;
@@ -74,7 +72,7 @@ public final class MySQLComStmtPrepareExecutor implements 
CommandExecutor {
         }
         int parameterCount = sqlStatement.getParameterCount();
         int projectionCount = getProjectionCount(sqlStatement);
-        int statementId = 
PREPARED_STATEMENT_REGISTRY.register(packet.getSql(), parameterCount);
+        int statementId = 
MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(backendConnection.getConnectionId()).prepareStatement(packet.getSql(),
 parameterCount);
         return createPackets(statementId, projectionCount, parameterCount);
     }
     

Reply via email to