This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch ssl_between_nodes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1ce4494eccf814586bf0720e461b740b17ffb80c Author: HTHou <[email protected]> AuthorDate: Thu Jul 17 16:02:06 2025 +0800 Grok4 is powerful --- .../iotdb/rpc/NettyTNonBlockingTransport.java | 522 +++++++++------------ .../iotdb/confignode/conf/ConfigNodeConfig.java | 4 +- .../manager/load/service/HeartbeatService.java | 20 +- .../apache/iotdb/commons/conf/CommonConfig.java | 2 +- 4 files changed, 239 insertions(+), 309 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java index 7584ac68792..1f2456c4386 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java @@ -48,40 +48,51 @@ import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.io.IOException; -import java.lang.reflect.Field; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; import java.security.KeyStore; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +/** + * A non-blocking Thrift transport implementation using Netty for asynchronous I/O. Integrates with + * Thrift's TAsyncClientManager using a dummy local SocketChannel for selector events. Supports + * SSL/TLS for secure communication. + */ public class NettyTNonBlockingTransport extends TNonblockingTransport { private static final Logger logger = LoggerFactory.getLogger(NettyTNonBlockingTransport.class); private final String host; private final int port; - private final int connectTimeoutMs = 60000; + private final int connectTimeoutMs; + private final long sslHandshakeTimeoutMs; private final EventLoopGroup eventLoopGroup; private final Bootstrap bootstrap; - private Channel channel; + private volatile Channel channel; private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean connecting = new AtomicBoolean(false); private final BlockingQueue<ByteBuf> readQueue = new LinkedBlockingQueue<>(); - private final Object writeLock = new Object(); - private NettySelectionKeyAdapter selectionKeyAdapter; + private final Object lock = new Object(); - // SSL 配置 + // SSL configuration private final String keystorePath; private final String keystorePassword; private final String truststorePath; private final String truststorePassword; + // Dummy local socket for selector integration + private ServerSocketChannel dummyServer; + private java.nio.channels.SocketChannel dummyClient; + private java.nio.channels.SocketChannel dummyServerAccepted; + private int dummyPort; + private Selector selector; // Stored for wakeup if needed + public NettyTNonBlockingTransport( String host, int port, @@ -93,15 +104,34 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { super(new TConfiguration()); this.host = host; this.port = port; + this.connectTimeoutMs = 60000; + this.sslHandshakeTimeoutMs = 30000; this.eventLoopGroup = new NioEventLoopGroup(); this.bootstrap = new Bootstrap(); this.keystorePath = keystorePath; this.keystorePassword = keystorePassword; this.truststorePath = truststorePath; this.truststorePassword = truststorePassword; + initDummyChannels(); initBootstrap(); } + /** Initializes dummy local channels for selector event simulation. */ + private void initDummyChannels() throws TTransportException { + try { + dummyServer = ServerSocketChannel.open(); + dummyServer.configureBlocking(false); + dummyServer.bind(new InetSocketAddress("localhost", 0)); + dummyPort = dummyServer.socket().getLocalPort(); + logger.debug("Dummy server bound to localhost:{}", dummyPort); + + dummyClient = java.nio.channels.SocketChannel.open(); + dummyClient.configureBlocking(false); + } catch (IOException e) { + throw new TTransportException("Failed to initialize dummy channels", e); + } + } + private void initBootstrap() { bootstrap .group(eventLoopGroup) @@ -113,28 +143,30 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { - logger.info("Initializing channel for {}:{}", host, port); + logger.debug("Initializing channel for {}:{}", host, port); ChannelPipeline pipeline = ch.pipeline(); SslContext sslContext = createSslContext(); SslHandler sslHandler = sslContext.newHandler(ch.alloc(), host, port); - // 增加握手超时时间 - sslHandler.setHandshakeTimeoutMillis(30000); + sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs); pipeline.addLast("ssl", sslHandler); - // 添加SSL握手完成监听器 sslHandler .handshakeFuture() .addListener( future -> { if (future.isSuccess()) { - logger.info("SSL handshake completed successfully"); + logger.info( + "SSL handshake completed successfully for {}:{}", host, port); } else { - logger.info("SSL handshake failed: {}", future.cause().getMessage()); + logger.error( + "SSL handshake failed for {}:{}: {}", + host, + port, + future.cause().getMessage()); } }); - // 添加业务处理器 pipeline.addLast("handler", new NettyTransportHandler()); } }); @@ -143,7 +175,6 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { private SslContext createSslContext() throws Exception { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); - // 配置 KeyStore(客户端证书) if (keystorePath != null && keystorePassword != null) { KeyStore keyStore = KeyStore.getInstance("JKS"); try (FileInputStream fis = new FileInputStream(keystorePath)) { @@ -155,7 +186,6 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { sslContextBuilder.keyManager(kmf); } - // 配置 TrustStore(信任的服务器证书) if (truststorePath != null && truststorePassword != null) { KeyStore trustStore = KeyStore.getInstance("JKS"); try (FileInputStream fis = new FileInputStream(truststorePath)) { @@ -172,40 +202,43 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { @Override public boolean isOpen() { - return channel != null && channel.isActive() && connected.get(); + synchronized (lock) { + return connected.get() || (channel != null && channel.isActive()); + } } @Override public void open() throws TTransportException { - throw new RuntimeException("open() is not implemented for NettyTNonblockingTransport"); + throw new TTransportException( + TTransportException.NOT_OPEN, "open() is not implemented; use startConnect() instead"); } - /** Perform a nonblocking read into buffer. */ + @Override public int read(ByteBuffer buffer) throws TTransportException { + boolean readingResponseSize = buffer.remaining() == 4; + if (!isOpen()) { - logger.info("Transport not open for ByteBuffer read"); + logger.debug("Transport not open for ByteBuffer read"); throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); } + ByteBuf byteBuf = null; try { - ByteBuf byteBuf = readQueue.poll(); + byteBuf = readQueue.poll(); if (byteBuf == null) { - logger.info("No data available for ByteBuffer read (non-blocking)"); - return 0; // 非阻塞读取,没有数据时返回 0 + logger.info("No data available for ByteBuffer read"); + return 0; } int available = Math.min(buffer.remaining(), byteBuf.readableBytes()); if (available > 0) { - // 从 ByteBuf 读取数据到 ByteBuffer byte[] tempArray = new byte[available]; byteBuf.readBytes(tempArray); buffer.put(tempArray); - logger.info( "Read {} bytes into ByteBuffer, remaining space: {}", available, buffer.remaining()); } - // 如果还有剩余数据,创建一个新的 ByteBuf 包含剩余数据 if (byteBuf.readableBytes() > 0) { ByteBuf remaining = byteBuf.slice(); remaining.retain(); @@ -213,41 +246,58 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { logger.info("Put back {} remaining bytes", remaining.readableBytes()); } - byteBuf.release(); - return available; + // Drain dummy channel to clear OP_READ + ByteBuffer discard = ByteBuffer.allocate(16); + try { + while (dummyClient.read(discard) > 0) { + discard.clear(); + } + } catch (IOException e) { + logger.warn("Failed to drain dummy channel", e); + } + if (readingResponseSize) { + // Trigger OP_READ on dummy by writing dummy byte + if (dummyServerAccepted != null) { + ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]); + dummyServerAccepted.write(dummyByte); + } + // Wakeup selector if needed + if (selector != null) { + selector.wakeup(); + } + } + return available; } catch (Exception e) { - logger.warn("ByteBuffer read failed: ", e); - throw new TTransportException(TTransportException.UNKNOWN, e); + logger.error("ByteBuffer read failed: {}", e.getMessage()); + throw new TTransportException(TTransportException.UNKNOWN, "Read failed", e); + } finally { + if (byteBuf != null) { + byteBuf.release(); + } } } - /** Reads from the underlying input stream if not null. */ @Override public int read(byte[] buf, int off, int len) throws TTransportException { if (!isOpen()) { - logger.info( - "Transport not open for read - channel: " - + (channel != null ? channel.isActive() : "null") - + ", connected: " - + connected.get()); + logger.debug("Transport not open for read"); throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); } try { - // 使用 ByteBuffer 包装数组进行读取 ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); return read(buffer); } catch (Exception e) { - logger.warn("Read failed: ", e); - throw new TTransportException(TTransportException.UNKNOWN, e); + logger.error("Read failed: {}", e.getMessage()); + throw new TTransportException(TTransportException.UNKNOWN, "Read failed", e); } } - /** Perform a nonblocking write of the data in buffer; */ + @Override public int write(ByteBuffer buffer) throws TTransportException { if (!isOpen()) { - logger.info("Transport not open for ByteBuffer write"); + logger.debug("Transport not open for ByteBuffer write"); throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); } @@ -256,170 +306,184 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { return 0; } - logger.info("Writing " + remaining + " bytes from ByteBuffer"); - - synchronized (writeLock) { - // 创建 ByteBuf 从 ByteBuffer - ByteBuf byteBuf = Unpooled.buffer(); - byteBuf.writeBytes(buffer); - ChannelFuture future = channel.writeAndFlush(byteBuf); + logger.info("Writing {} bytes from ByteBuffer", remaining); - final int bytesToWrite = remaining; - future.addListener( - (GenericFutureListener<ChannelFuture>) - future1 -> { - if (future1.isSuccess()) { - logger.info( - "ByteBuffer write completed successfully: " + bytesToWrite + " bytes"); - } else { - logger.warn("ByteBuffer write failed: " + future1.cause().getMessage()); - future1.cause().printStackTrace(); - } - }); + synchronized (lock) { + ByteBuf byteBuf = Unpooled.buffer(remaining); + try { + byteBuf.writeBytes(buffer); + ChannelFuture future = channel.writeAndFlush(byteBuf); + future.addListener( + (GenericFutureListener<ChannelFuture>) + future1 -> { + if (future1.isSuccess()) { + logger.debug("ByteBuffer write completed successfully: {} bytes", remaining); + } else { + logger.error("ByteBuffer write failed: {}", future1.cause().getMessage()); + } + }); + return remaining; + } catch (Exception e) { + byteBuf.release(); + logger.error("ByteBuffer write failed: {}", e.getMessage()); + throw new TTransportException(TTransportException.UNKNOWN, "Write failed", e); + } } - - // 对于非阻塞写入,我们假设所有数据都能写入 - // 实际的写入状态通过 Future 监听器处理 - return remaining; } - /** Writes to the underlying output stream if not null. */ @Override public void write(byte[] buf, int off, int len) throws TTransportException { if (!isOpen()) { - logger.info("Transport not open for write"); + logger.debug("Transport not open for write"); throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); } - // 使用 ByteBuffer 包装数组进行写入 ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); write(buffer); } @Override public void flush() throws TTransportException { - // Not supported by SocketChannel. + if (!isOpen()) { + logger.debug("Transport not open for flush"); + throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); + } + synchronized (lock) { + channel.flush(); + logger.debug("Flushed channel"); + } } @Override public void close() { - connected.set(false); - if (channel != null) { - channel.close(); + synchronized (lock) { + connected.set(false); + if (channel != null) { + channel.close(); + channel = null; + logger.info("Channel closed for {}:{}", host, port); + } + try { + if (dummyClient != null) { + dummyClient.close(); + } + if (dummyServerAccepted != null) { + dummyServerAccepted.close(); + } + if (dummyServer != null) { + dummyServer.close(); + } + } catch (IOException e) { + logger.warn("Failed to close dummy channels", e); + } + eventLoopGroup.shutdownGracefully(); + logger.info("EventLoopGroup shutdown initiated"); } - eventLoopGroup.shutdownGracefully(); } @Override public boolean startConnect() throws IOException { if (connected.get() || connecting.get()) { - logger.info("Starting connection return " + (connected.get() || connecting.get())); + logger.debug("Connection already started or established for {}:{}", host, port); return connected.get(); } if (!connecting.compareAndSet(false, true)) { + logger.debug("Concurrent connection attempt detected for {}:{}", host, port); return false; } - logger.info("Starting connection to " + host + ":" + port); + + logger.info("Starting connection to {}:{}", host, port); try { + // Initiate dummy connect, it will pend until accept + dummyClient.connect(new InetSocketAddress("localhost", dummyPort)); + + // Initiate Netty connect ChannelFuture future = bootstrap.connect(host, port); future.addListener( (GenericFutureListener<ChannelFuture>) future1 -> { - if (future1.isSuccess()) { - logger.info("Connection established successfully"); - channel = future1.channel(); - connected.set(true); - if (selectionKeyAdapter != null) { - selectionKeyAdapter.setConnected(true); + synchronized (lock) { + if (future1.isSuccess()) { + logger.info("Connection established successfully to {}:{}", host, port); + channel = future1.channel(); + connected.set(true); + // Now accept the dummy connection to complete it + try { + dummyServerAccepted = dummyServer.accept(); + if (dummyServerAccepted != null) { + dummyServerAccepted.configureBlocking(false); + logger.debug("Dummy server accepted connection"); + // Wakeup selector to detect OP_CONNECT + if (selector != null) { + selector.wakeup(); + } + } + } catch (IOException e) { + logger.warn("Failed to accept dummy connection", e); + } + } else { + logger.error( + "Connection failed to {}:{}: {}", host, port, future1.cause().getMessage()); } + connecting.set(false); } - connecting.set(false); }); future.get(); - return true; // 异步连接,立即返回 false + return false; // Return false to indicate pending connect for dummy } catch (Exception e) { connecting.set(false); - // throw new IOException("Failed to start connection", e); - return false; + logger.error("Failed to start connection to {}:{}", host, port, e.getMessage()); + throw new IOException("Failed to start connection", e); } } @Override public boolean finishConnect() throws IOException { - return connected.get(); + synchronized (lock) { + boolean dummyFinished = dummyClient.finishConnect(); + boolean isConnected = connected.get() && dummyFinished; + logger.debug( + "finishConnect called, netty connected: {}, dummy finished: {}", + connected.get(), + dummyFinished); + return isConnected; + } } @Override public SelectionKey registerSelector(Selector selector, int interests) throws IOException { - if (selectionKeyAdapter == null) { - selectionKeyAdapter = new NettySelectionKeyAdapter(this, selector, interests); - - // 尝试通过反射获取 selectedKeys 的可修改引用 - try { - // 尝试不同的字段名,因为不同的 Selector 实现可能使用不同的字段名 - String[] possibleFieldNames = {"selectedKeys", "publicSelectedKeys", "keys"}; - Field selectedKeysField = null; - - for (String fieldName : possibleFieldNames) { - try { - selectedKeysField = selector.getClass().getSuperclass().getDeclaredField(fieldName); - break; - } catch (NoSuchFieldException e) { - // 继续尝试下一个字段名 - } - } - - if (selectedKeysField != null) { - selectedKeysField.setAccessible(true); - Object selectedKeysObj = selectedKeysField.get(selector); - - if (selectedKeysObj instanceof Set) { - @SuppressWarnings("unchecked") - Set<SelectionKey> selectedKeys = (Set<SelectionKey>) selectedKeysObj; - selectionKeyAdapter.setSelectedKeysReference(selectedKeys); - logger.info("Successfully obtained selectedKeys reference via reflection"); - try { - selectionKeyAdapter.selectedKeysReference.add(selectionKeyAdapter); - selector.wakeup(); - } catch (Exception e) { - logger.warn("Failed to add to selectedKeys: " + e.getMessage()); - } - } - } - } catch (Exception e) { - logger.warn("Failed to access selectedKeys via reflection: " + e.getMessage()); - // 继续执行,使用备用方案 - } - - } else { - selectionKeyAdapter.interestOps(interests); + synchronized (lock) { + this.selector = selector; + return dummyClient.register(selector, interests); } - - return selectionKeyAdapter; } @Override public String toString() { - return "[remote: " + getRemoteAddress() + ", local: " + getLocalAddress() + "]"; + synchronized (lock) { + return "[remote: " + getRemoteAddress() + ", local: " + getLocalAddress() + "]"; + } + } + + public SocketAddress getRemoteAddress() { + synchronized (lock) { + return channel != null ? channel.remoteAddress() : null; + } + } + + public SocketAddress getLocalAddress() { + synchronized (lock) { + return channel != null ? channel.localAddress() : null; + } } - // Netty 处理器 private class NettyTransportHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - logger.info("Channel active: " + ctx.channel().remoteAddress()); - - connected.set(true); - - // 更新 SelectionKey 状态 - if (selectionKeyAdapter != null) { - selectionKeyAdapter.setConnected(true); - selectionKeyAdapter.setReadReady(true); - } - + logger.info("Channel active: {}", ctx.channel().remoteAddress()); super.channelActive(ctx); } @@ -427,172 +491,40 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; - logger.info("Received " + byteBuf.readableBytes() + " bytes"); - - // 保留引用计数,将数据放入读取队列 - readQueue.offer(byteBuf.retain()); - byteBuf.release(); // 释放原始引用 - - // 通知选择器适配器有数据可读 - if (selectionKeyAdapter != null) { - selectionKeyAdapter.setReadReady(true); + logger.info("Received {} bytes", byteBuf.readableBytes()); + + synchronized (lock) { + readQueue.offer(byteBuf.retain()); + // Trigger OP_READ on dummy by writing dummy byte + if (dummyServerAccepted != null) { + ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]); + dummyServerAccepted.write(dummyByte); + } + // Wakeup selector if needed + if (selector != null) { + selector.wakeup(); + } } + byteBuf.release(); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.info("Channel inactive"); - - connected.set(false); - connecting.set(false); - - // 更新 SelectionKey 状态 - if (selectionKeyAdapter != null) { - selectionKeyAdapter.setConnected(false); - selectionKeyAdapter.setReadReady(false); + logger.info("Channel inactive: {}", ctx.channel().remoteAddress()); + synchronized (lock) { + connected.set(false); + connecting.set(false); } - super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.warn("Channel exception: {}", cause.getMessage()); - - // 更新 SelectionKey 状态 - if (selectionKeyAdapter != null) { - selectionKeyAdapter.setConnected(false); - selectionKeyAdapter.setReadReady(false); - selectionKeyAdapter.cancel(); // 取消 SelectionKey - } - - ctx.close(); - } - } - - // SelectionKey 适配器类 - private static class NettySelectionKeyAdapter extends SelectionKey { - private final NettyTNonBlockingTransport transport; - private Selector selector; - private int interestOps; - private int readyOps = 0; - private volatile boolean connected = false; - private volatile boolean readReady = false; - private volatile boolean valid = true; - private Set<SelectionKey> selectedKeysReference; - - public NettySelectionKeyAdapter( - NettyTNonBlockingTransport transport, Selector selector, int ops) { - this.transport = transport; - this.selector = selector; - this.interestOps = ops; - } - - public void setSelectedKeysReference(Set<SelectionKey> selectedKeys) { - this.selectedKeysReference = selectedKeys; - } - - @Override - public SelectableChannel channel() { - return null; // Netty 管理通道 - } - - @Override - public Selector selector() { - return selector; - } - - @Override - public void cancel() { - this.valid = false; - if (selectedKeysReference != null) { - selectedKeysReference.remove(this); - } - logger.info("SelectionKey cancelled"); - } - - @Override - public boolean isValid() { - return valid; - } - - @Override - public SelectionKey interestOps(int ops) { - this.interestOps = ops; - logger.info("SelectionKey interestOps set to: " + ops); - updateSelectorIfReady(); - return this; - } - - @Override - public int interestOps() { - return interestOps; - } - - @Override - public int readyOps() { - int ops = 0; - - // 检查连接状态 - if (connected && (interestOps & OP_CONNECT) != 0) { - ops |= OP_CONNECT; - } - - // 检查读取状态 - if (readReady && (interestOps & OP_READ) != 0) { - ops |= OP_READ; - } - - // 写入通常总是就绪的(如果连接是活跃的) - if ((interestOps & OP_WRITE) != 0 && transport.isOpen()) { - ops |= OP_WRITE; - } - - if (ops != readyOps) { - logger.info("SelectionKey readyOps changed: " + readyOps + " -> " + ops); - } - - readyOps = ops; - return readyOps; - } - - public void setConnected(boolean connected) { - this.connected = connected; - updateSelectorIfReady(); - } - - public void setReadReady(boolean readReady) { - if (this.readReady != readReady) { - logger.info("ReadReady changed: {} -> {}", this.readReady, readReady); + logger.error("Channel exception: {}", cause.getMessage()); + synchronized (lock) { + ctx.close(); } - this.readReady = readReady; - updateSelectorIfReady(); } - - private void updateSelectorIfReady() { - if (selector != null && isValid() && selectedKeysReference != null) { - int ready = readyOps(); - if (ready != 0) { - try { - selectedKeysReference.add(this); - selector.wakeup(); - logger.info("Added self to selectedKeys via reference, readyOps: " + ready); - } catch (Exception e) { - logger.warn("Failed to add to selectedKeys: " + e.getMessage()); - } - } - } - } - } - - // 辅助方法:获取远程地址 - public SocketAddress getRemoteAddress() { - return channel != null ? channel.remoteAddress() : null; - } - - // 辅助方法:获取本地地址 - public SocketAddress getLocalAddress() { - return channel != null ? channel.localAddress() : null; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index a8d5c330c5b..50c8f5bc9a0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -61,10 +61,10 @@ public class ConfigNodeConfig { private int configRegionId = 0; /** ConfigNodeGroup consensus protocol. */ - private String configNodeConsensusProtocolClass = ConsensusFactory.SIMPLE_CONSENSUS; + private String configNodeConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; /** Schema region consensus protocol. */ - private String schemaRegionConsensusProtocolClass = ConsensusFactory.SIMPLE_CONSENSUS; + private String schemaRegionConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; /** Default number of SchemaRegion replicas. */ private int schemaReplicationFactor = 1; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index e079fac021f..59545dbec0a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool; import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool; @@ -35,7 +34,6 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler; -import org.apache.iotdb.confignode.client.sync.SyncDataNodeHeartbeatClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; @@ -278,15 +276,15 @@ public class HeartbeatService { configManager.getPipeManager().getPipeRuntimeCoordinator()); configManager.getClusterQuotaManager().updateSpaceQuotaUsage(); addConfigNodeLocationsToReq(dataNodeId, heartbeatReq); -// if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) { -// SyncDataNodeHeartbeatClientPool.getInstance() -// .getDataNodeHeartBeat( -// dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); -// } else { - AsyncDataNodeHeartbeatClientPool.getInstance() - .getDataNodeHeartBeat( - dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); -// } + // if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) { + // SyncDataNodeHeartbeatClientPool.getInstance() + // .getDataNodeHeartBeat( + // dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); + // } else { + AsyncDataNodeHeartbeatClientPool.getInstance() + .getDataNodeHeartBeat( + dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); + // } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 783769edaa4..852e55a65d3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -406,7 +406,7 @@ public class CommonConfig { private volatile Pattern trustedUriPattern = Pattern.compile("file:.*"); /** Enable the thrift Service ssl. */ - private boolean enableSSL = false; + private boolean enableSSL = true; /** ssl key Store Path. */ private String keyStorePath = "/Users/ht/.keystore";
