This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_sync in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c3ee605829b3290f5b12e35a450daae1b108e7e Author: qiaojialin <[email protected]> AuthorDate: Wed Jul 13 16:54:36 2022 +0800 fix sync deadlock --- .../service/basic/StandaloneServiceProvider.java | 113 --------------------- .../db/service/thrift/impl/TSServiceImpl.java | 108 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 113 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java index bddb0cac9d..a651665f79 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java +++ b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java @@ -18,15 +18,7 @@ */ package org.apache.iotdb.db.service.basic; -import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.doublelive.OperationSyncConsumer; -import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector; -import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector; -import org.apache.iotdb.db.doublelive.OperationSyncLogService; -import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils; -import org.apache.iotdb.db.doublelive.OperationSyncProducer; -import org.apache.iotdb.db.doublelive.OperationSyncWriteTask; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.StorageEngineReadonlyException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; @@ -36,73 +28,12 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.session.pool.SessionPool; -import org.apache.iotdb.tsfile.utils.Pair; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; public class StandaloneServiceProvider extends ServiceProvider { - /* OperationSync module */ - private static final boolean isEnableOperationSync = - IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync(); - private final SessionPool operationSyncsessionPool; - private final OperationSyncProducer operationSyncProducer; - private final OperationSyncDDLProtector operationSyncDDLProtector; - private final OperationSyncLogService operationSyncDDLLogService; public StandaloneServiceProvider() throws QueryProcessException { super(new PlanExecutor()); - if (isEnableOperationSync) { - /* Open OperationSync */ - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - // create SessionPool for OperationSync - operationSyncsessionPool = - new SessionPool( - config.getSecondaryAddress(), - config.getSecondaryPort(), - config.getSecondaryUser(), - config.getSecondaryPassword(), - 5); - - // create operationSyncDDLProtector and operationSyncDDLLogService - operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool); - new Thread(operationSyncDDLProtector).start(); - operationSyncDDLLogService = - new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector); - new Thread(operationSyncDDLLogService).start(); - - // create OperationSyncProducer - BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>> - blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize()); - operationSyncProducer = new OperationSyncProducer(blockingQueue); - - // create OperationSyncDMLProtector and OperationSyncDMLLogService - OperationSyncDMLProtector operationSyncDMLProtector = - new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer); - new Thread(operationSyncDMLProtector).start(); - OperationSyncLogService operationSyncDMLLogService = - new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector); - new Thread(operationSyncDMLLogService).start(); - - // create OperationSyncConsumer - for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) { - OperationSyncConsumer consumer = - new OperationSyncConsumer( - blockingQueue, operationSyncsessionPool, operationSyncDMLLogService); - new Thread(consumer).start(); - } - } else { - operationSyncsessionPool = null; - operationSyncProducer = null; - operationSyncDDLProtector = null; - operationSyncDDLLogService = null; - } } @Override @@ -121,50 +52,6 @@ public class StandaloneServiceProvider extends ServiceProvider { throw new StorageEngineReadonlyException(); } - if (isEnableOperationSync) { - // OperationSync should transmit before execute - transmitOperationSync(plan); - } - return executor.processNonQuery(plan); } - - private void transmitOperationSync(PhysicalPlan physicalPlan) { - - OperationSyncPlanTypeUtils.OperationSyncPlanType planType = - OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan); - if (planType == null) { - // Don't need OperationSync - return; - } - - // serialize physical plan - ByteBuffer buffer; - try { - int size = physicalPlan.getSerializedSize(); - ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size); - DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream); - physicalPlan.serialize(operationSyncSerializeStream); - buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray()); - } catch (IOException e) { - LOGGER.error("OperationSync can't serialize PhysicalPlan", e); - return; - } - - switch (planType) { - case DDLPlan: - // Create OperationSyncWriteTask and wait - OperationSyncWriteTask ddlTask = - new OperationSyncWriteTask( - buffer, - operationSyncsessionPool, - operationSyncDDLProtector, - operationSyncDDLLogService); - ddlTask.run(); - break; - case DMLPlan: - // Put into OperationSyncProducer - operationSyncProducer.put(new Pair<>(buffer, planType)); - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 7e3ee0b5e0..5717f3ca5e 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -25,7 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.OperationType; +import org.apache.iotdb.db.doublelive.OperationSyncConsumer; +import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector; +import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector; +import org.apache.iotdb.db.doublelive.OperationSyncLogService; import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils; +import org.apache.iotdb.db.doublelive.OperationSyncProducer; +import org.apache.iotdb.db.doublelive.OperationSyncWriteTask; import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator; import org.apache.iotdb.db.exception.IoTDBException; import org.apache.iotdb.db.exception.QueryInBatchStatementException; @@ -126,6 +132,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetUsingTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.service.rpc.thrift.TSTracingInfo; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; +import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -133,11 +140,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; @@ -148,6 +158,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -168,6 +180,14 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; /** Thrift RPC implementation at server side. */ public class TSServiceImpl implements TSIService.Iface { + /* OperationSync module */ + private static final boolean isEnableOperationSync = + IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync(); + private final SessionPool operationSyncsessionPool; + private final OperationSyncProducer operationSyncProducer; + private final OperationSyncDDLProtector operationSyncDDLProtector; + private final OperationSyncLogService operationSyncDDLLogService; + protected class QueryTask implements Callable<TSExecuteStatementResp> { private PhysicalPlan plan; @@ -311,6 +331,51 @@ public class TSServiceImpl implements TSIService.Iface { public TSServiceImpl() { super(); serviceProvider = IoTDB.serviceProvider; + if (isEnableOperationSync) { + /* Open OperationSync */ + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + // create SessionPool for OperationSync + operationSyncsessionPool = + new SessionPool( + config.getSecondaryAddress(), + config.getSecondaryPort(), + config.getSecondaryUser(), + config.getSecondaryPassword(), + 5); + + // create operationSyncDDLProtector and operationSyncDDLLogService + operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool); + new Thread(operationSyncDDLProtector).start(); + operationSyncDDLLogService = + new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector); + new Thread(operationSyncDDLLogService).start(); + + // create OperationSyncProducer + BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>> + blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize()); + operationSyncProducer = new OperationSyncProducer(blockingQueue); + + // create OperationSyncDMLProtector and OperationSyncDMLLogService + OperationSyncDMLProtector operationSyncDMLProtector = + new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer); + new Thread(operationSyncDMLProtector).start(); + OperationSyncLogService operationSyncDMLLogService = + new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector); + new Thread(operationSyncDMLLogService).start(); + + // create OperationSyncConsumer + for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) { + OperationSyncConsumer consumer = + new OperationSyncConsumer( + blockingQueue, operationSyncsessionPool, operationSyncDMLLogService); + new Thread(consumer).start(); + } + } else { + operationSyncsessionPool = null; + operationSyncProducer = null; + operationSyncDDLProtector = null; + operationSyncDDLLogService = null; + } } @Override @@ -2157,6 +2222,10 @@ public class TSServiceImpl implements TSIService.Iface { protected TSStatus executeNonQueryPlan(PhysicalPlan plan) { try { + if (isEnableOperationSync) { + // OperationSync should transmit before execute + transmitOperationSync(plan); + } return serviceProvider.executeNonQuery(plan) ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully") : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); @@ -2171,6 +2240,45 @@ public class TSServiceImpl implements TSIService.Iface { "Log in failed. Either you are not authorized or the session has timed out."); } + private void transmitOperationSync(PhysicalPlan physicalPlan) { + + OperationSyncPlanTypeUtils.OperationSyncPlanType planType = + OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan); + if (planType == null) { + // Don't need OperationSync + return; + } + + // serialize physical plan + ByteBuffer buffer; + try { + int size = physicalPlan.getSerializedSize(); + ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size); + DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream); + physicalPlan.serialize(operationSyncSerializeStream); + buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray()); + } catch (IOException e) { + LOGGER.error("OperationSync can't serialize PhysicalPlan", e); + return; + } + + switch (planType) { + case DDLPlan: + // Create OperationSyncWriteTask and wait + OperationSyncWriteTask ddlTask = + new OperationSyncWriteTask( + buffer, + operationSyncsessionPool, + operationSyncDDLProtector, + operationSyncDDLLogService); + ddlTask.run(); + break; + case DMLPlan: + // Put into OperationSyncProducer + operationSyncProducer.put(new Pair<>(buffer, planType)); + } + } + /** Add stat of operation into metrics */ private void addOperationLatency(Operation operation, long startTime) { if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {
