This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 61b19b5 Fix PostgreSQL Proxy duplicate ReadyForQuery (#12987)
61b19b5 is described below
commit 61b19b57b7101b1c03eb7b345a225a9db49d32a9
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Oct 13 19:00:02 2021 +0800
Fix PostgreSQL Proxy duplicate ReadyForQuery (#12987)
---
.../command/PostgreSQLCommandPacketType.java | 18 ++++++++++++++++++
.../command/PostgreSQLCommandPacketTypeTest.java | 22 +++++++++++++---------
.../PostgreSQLMessagePacketTypeTest.java} | 5 ++---
.../command/PostgreSQLCommandExecuteEngine.java | 16 ++++++++++++++--
.../command/PostgreSQLConnectionContext.java | 12 ++++++++++--
.../execute/PostgreSQLComExecuteExecutor.java | 2 +-
6 files changed, 58 insertions(+), 17 deletions(-)
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
index 014ad2d..def5e13 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
@@ -22,6 +22,10 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* Command packet type for PostgreSQL.
*
@@ -49,6 +53,10 @@ public enum PostgreSQLCommandPacketType implements
CommandPacketType, PostgreSQL
TERMINATE('X');
+ private static final Set<PostgreSQLCommandPacketType>
EXTENDED_PROTOCOL_PACKET_TYPE = new
HashSet<>(Arrays.asList(PostgreSQLCommandPacketType.PARSE_COMMAND,
+ PostgreSQLCommandPacketType.BIND_COMMAND,
PostgreSQLCommandPacketType.DESCRIBE_COMMAND,
PostgreSQLCommandPacketType.EXECUTE_COMMAND,
PostgreSQLCommandPacketType.SYNC_COMMAND,
+ PostgreSQLCommandPacketType.CLOSE_COMMAND,
PostgreSQLCommandPacketType.FLUSH_COMMAND));
+
private final char value;
/**
@@ -65,4 +73,14 @@ public enum PostgreSQLCommandPacketType implements
CommandPacketType, PostgreSQL
}
throw new IllegalArgumentException(String.format("Cannot find '%s' in
PostgreSQL command packet type", value));
}
+
+ /**
+ * Check if the packet type is extended protocol packet type.
+ *
+ * @param commandPacketType command packet type
+ * @return is extended protocol packet type
+ */
+ public static boolean isExtendedProtocolPacketType(final
PostgreSQLCommandPacketType commandPacketType) {
+ return EXTENDED_PROTOCOL_PACKET_TYPE.contains(commandPacketType);
+ }
}
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
index 8fed55b..cb89948 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
@@ -17,21 +17,25 @@
package org.apache.shardingsphere.db.protocol.postgresql.packet.command;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import org.junit.Test;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class PostgreSQLCommandPacketTypeTest {
- @Test
- public void assertValueOf() {
-
assertThat(PostgreSQLMessagePacketType.valueOf(PostgreSQLMessagePacketType.AUTHENTICATION_REQUEST.getValue()),
is(PostgreSQLMessagePacketType.AUTHENTICATION_REQUEST));
+ @Test(expected = IllegalArgumentException.class)
+ public void assertValueOfUnknownCommandPacketType() {
+ PostgreSQLCommandPacketType.valueOf(-1);
}
- @Test(expected = IllegalArgumentException.class)
- public void assertGetValueWithIllegalArgument() {
- PostgreSQLMessagePacketType.valueOf(-1);
+ @Test
+ public void assertValueOfExtendedProtocolCommandPacketType() {
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.PARSE_COMMAND));
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.BIND_COMMAND));
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.DESCRIBE_COMMAND));
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.EXECUTE_COMMAND));
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.CLOSE_COMMAND));
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.FLUSH_COMMAND));
+
assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.SYNC_COMMAND));
}
}
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/identifier/PostgreSQLMessagePacketTypeTest.java
similarity index 89%
copy from
shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
copy to
shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/identifier/PostgreSQLMessagePacketTypeTest.java
index 8fed55b..b412c46 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/identifier/PostgreSQLMessagePacketTypeTest.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.db.protocol.postgresql.packet.command;
+package org.apache.shardingsphere.db.protocol.postgresql.packet.identifier;
-import
org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class PostgreSQLCommandPacketTypeTest {
+public final class PostgreSQLMessagePacketTypeTest {
@Test
public void assertValueOf() {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index 7a10125..14a9cf1 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -43,6 +43,7 @@ import
org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.Po
import
org.apache.shardingsphere.proxy.frontend.postgresql.err.PostgreSQLErrPacketFactory;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.Optional;
/**
@@ -57,24 +58,35 @@ public final class PostgreSQLCommandExecuteEngine
implements CommandExecuteEngin
@Override
public PostgreSQLCommandPacket getCommandPacket(final PacketPayload
payload, final CommandPacketType type, final BackendConnection
backendConnection) {
+ PostgreSQLConnectionContext connectionContext =
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+ connectionContext.setCurrentPacketType((PostgreSQLCommandPacketType)
type);
return
PostgreSQLCommandPacketFactory.newInstance((PostgreSQLCommandPacketType) type,
(PostgreSQLPacketPayload) payload, backendConnection.getConnectionId());
}
@Override
public CommandExecutor getCommandExecutor(final CommandPacketType type,
final CommandPacket packet, final BackendConnection backendConnection) throws
SQLException {
PostgreSQLConnectionContext connectionContext =
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+ if (connectionContext.isErrorOccurred() &&
PostgreSQLCommandPacketType.isExtendedProtocolPacketType((PostgreSQLCommandPacketType)
type) && PostgreSQLCommandPacketType.SYNC_COMMAND != type) {
+ return Collections::emptyList;
+ }
return
PostgreSQLCommandExecutorFactory.newInstance((PostgreSQLCommandPacketType)
type, (PostgreSQLCommandPacket) packet, backendConnection, connectionContext);
}
@Override
public DatabasePacket<?> getErrorPacket(final Exception cause, final
BackendConnection backendConnection) {
-
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId()).getPendingExecutors().clear();
+ PostgreSQLConnectionContext connectionContext =
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+ if
(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(connectionContext.getCurrentPacketType()))
{
+ connectionContext.setErrorOccurred(true);
+ connectionContext.getPendingExecutors().clear();
+ }
return PostgreSQLErrPacketFactory.newInstance(cause);
}
@Override
public Optional<DatabasePacket<?>> getOtherPacket(final BackendConnection
backendConnection) {
- return Optional.of(new
PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
+ PostgreSQLConnectionContext connectionContext =
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+ return
PostgreSQLCommandPacketType.isExtendedProtocolPacketType(connectionContext.getCurrentPacketType())
? Optional.empty()
+ : Optional.of(new
PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
index 2a1c836..1ca380d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
@@ -17,9 +17,11 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import
org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
+import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
@@ -37,17 +39,21 @@ import java.util.Optional;
/**
* PostgreSQL connection context.
*/
+@Getter
@Setter
public final class PostgreSQLConnectionContext {
+ @Getter(AccessLevel.NONE)
private final Map<String, PostgreSQLPortal> portals = new
LinkedHashMap<>();
- @Getter
private final Collection<CommandExecutor> pendingExecutors = new
LinkedList<>();
- @Getter
private long updateCount;
+ private PostgreSQLCommandPacketType currentPacketType;
+
+ private boolean errorOccurred;
+
/**
* Create a portal.
*
@@ -124,5 +130,7 @@ public final class PostgreSQLConnectionContext {
public void clearContext() {
pendingExecutors.clear();
updateCount = 0;
+ currentPacketType = null;
+ errorOccurred = false;
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
index 05749e9..d22dfc6 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
@@ -89,7 +89,7 @@ public final class PostgreSQLComExecuteExecutor implements
CommandExecutor {
}
String sqlCommand =
PostgreSQLCommand.valueOf(portal.getSqlStatement().getClass()).map(PostgreSQLCommand::getTag).orElse("");
PostgreSQLCommandCompletePacket result = new
PostgreSQLCommandCompletePacket(sqlCommand, Math.max(dataRows,
connectionContext.getUpdateCount()));
- connectionContext.clearContext();
+ connectionContext.setUpdateCount(0);
return result;
}