RATIS-334 Implement server membership for LogService Metadata Service Signed-off-by: Josh Elser <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/85d8e025 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/85d8e025 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/85d8e025 Branch: refs/heads/master Commit: 85d8e025ffe332dc1329fbecd41d841fbc136db8 Parents: cce03d0 Author: Sergey Soldatov <[email protected]> Authored: Fri Oct 19 01:12:36 2018 -0700 Committer: Josh Elser <[email protected]> Committed: Wed Oct 24 19:18:19 2018 -0400 ---------------------------------------------------------------------- ratis-logservice/pom.xml | 134 +++++++ .../apache/ratis/logservice/api/LogInfo.java | 49 +++ .../apache/ratis/logservice/api/LogName.java | 4 +- .../ratis/logservice/api/LogStateMachine.java | 151 ++++---- .../logservice/client/LogServiceClient.java | 146 ++++++++ .../ratis/logservice/common/Constants.java | 31 ++ .../common/LogAlreadyExistException.java | 27 ++ .../logservice/common/LogNotFoundException.java | 28 ++ .../common/NoEnoughWorkersException.java | 32 ++ .../ratis/logservice/impl/LogReaderImpl.java | 3 +- .../ratis/logservice/impl/LogServiceImpl.java | 76 ++-- .../ratis/logservice/impl/LogStreamImpl.java | 4 +- .../ratis/logservice/impl/LogWriterImpl.java | 18 +- .../server/ManagementStateMachine.java | 29 ++ .../ratis/logservice/server/MasterServer.java | 177 +++++++++ .../logservice/server/MetaStateMachine.java | 371 +++++++++++++++++++ .../logservice/util/LogServiceProtoUtil.java | 160 ++------ .../ratis/logservice/util/LogServiceUtils.java | 54 +++ .../logservice/util/MetaServiceProtoUtil.java | 209 +++++++++++ .../logservice/worker/LogServiceWorker.java | 130 +++++++ .../src/main/proto/LogService.proto | 135 +++++++ .../src/main/proto/MetaService.proto | 143 +++++++ .../src/main/resources/log4j.properties | 23 ++ .../ratis/logservice/LogServiceBaseTest.java | 9 +- .../ratis/logservice/server/TestMetaServer.java | 200 ++++++++++ .../logservice/util/LogServiceCluster.java | 156 ++++++++ .../util/TestLogServiceProtoUtil.java | 109 +----- ratis-proto/src/main/proto/Logservice.proto | 178 --------- 28 files changed, 2224 insertions(+), 562 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-logservice/pom.xml b/ratis-logservice/pom.xml index a3c114d..9556dcd 100644 --- a/ratis-logservice/pom.xml +++ b/ratis-logservice/pom.xml @@ -23,6 +23,135 @@ <artifactId>ratis-logservice</artifactId> <name>Apache Ratis LogService</name> + <build> + <extensions> + <!-- Use os-maven-plugin to initialize the "os.detected" properties --> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.5.0.Final</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerArgs> + <!-- disable all javac warnings for shaded sources --> + <arg>-Xlint:none</arg> + <arg>-XDignore.symbol.file</arg> + </compilerArgs> + <showWarnings>false</showWarnings> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration/> + </plugin> + <!-- Make a jar and put the sources in the jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <!--Make it so assembly:single does nothing in here--> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <skipAssembly>true</skipAssembly> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <configuration> + <includes> + <include>**/*.proto</include> + </includes> + <protocArtifact> + com.google.protobuf:protoc:${shaded.protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <!-- Place these in a location that compiler-plugin is already looking --> + <outputDirectory>${project.build.directory}/generated-sources</outputDirectory> + <!-- With multiple executions, this must be `false` otherwise we wipe out the previous execution --> + <clearOutputDirectory>false</clearOutputDirectory> + </configuration> + <executions> + <execution> + <id>compile-protobuf</id> + <phase>generate-sources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>com.google.code.maven-replacer-plugin</groupId> + <artifactId>replacer</artifactId> + <version>1.5.3</version> + <executions> + <execution> + <phase>process-sources</phase> + <goals> + <goal>replace</goal> + </goals> + </execution> + </executions> + <configuration> + <basedir>${project.build.directory}/generated-sources/org/apache/ratis/logservice/proto</basedir> + <includes> + <include>**/*.java</include> + </includes> + <replacements> + <replacement> + <token>([^\.])com.google</token> + <value>$1org.apache.ratis.thirdparty.com.google</value> + </replacement> + <replacement> + <token>([^\.])io.grpc</token> + <value>$1org.apache.ratis.thirdparty.io.grpc</value> + </replacement> + </replacements> + </configuration> + </plugin> + </plugins> + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId> + com.google.code.maven-replacer-plugin + </groupId> + <artifactId>replacer</artifactId> + <versionRange> + [1.5.3,) + </versionRange> + <goals> + <goal>replace</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + <dependencies> <!-- Ratis dependencies --> <dependency> @@ -112,5 +241,10 @@ <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + <version>1.72</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java new file mode 100644 index 0000000..63a07d1 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.api; + +import org.apache.ratis.protocol.RaftGroup; + +/** + * This classs represent the pair of LogName -> RaftGroup + */ +public class LogInfo { + private final RaftGroup raftGroup; + private final LogName logName; + public LogInfo(LogName logName, RaftGroup raftGroup) { + this.logName = logName; + this.raftGroup = raftGroup; + } + + /** + * Log name + * @return + */ + public LogName getLogName() { + return logName; + } + + /** + * Raft group + * @return + */ + public RaftGroup getRaftGroup() { + return raftGroup; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java index 3405340..00496dc 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java @@ -21,7 +21,7 @@ import static java.util.Objects.requireNonNull; import java.util.Objects; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto; +import org.apache.ratis.logservice.proto.LogServiceProtos; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; @@ -83,7 +83,7 @@ public class LogName { public static LogName parseFrom(ByteString logName) throws InvalidProtocolBufferException { - LogNameProto logNameProto = LogNameProto.parseFrom(logName); + LogServiceProtos.LogNameProto logNameProto = LogServiceProtos.LogNameProto.parseFrom(logName); return new LogName(logNameProto.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java index 1a3edc2..1617c94 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java @@ -35,23 +35,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ratis.logservice.impl.LogStreamImpl; +import org.apache.ratis.logservice.proto.LogServiceProtos.*; import org.apache.ratis.logservice.util.LogServiceProtoUtil; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; @@ -203,10 +190,6 @@ public class LogStateMachine extends BaseStateMachine { return processGetLengthRequest(logServiceRequestProto); case STARTINDEXQUERY: return processGetStartIndexRequest(logServiceRequestProto); - case LISTLOGS: - return processListLogsRequest(); - case GETLOG: - return processGetLogRequest(logServiceRequestProto); case GETSTATE: return processGetStateRequest(logServiceRequestProto); default: @@ -344,14 +327,14 @@ public class LogStateMachine extends BaseStateMachine { LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData()); switch (logServiceRequestProto.getRequestCase()) { - case CREATELOG: - return processCreateLogRequest(logServiceRequestProto); - case ARCHIVELOG: - return processArchiveLog(logServiceRequestProto); +// case CREATELOG: +// return processCreateLogRequest(logServiceRequestProto); +// case ARCHIVELOG: +// return processArchiveLog(logServiceRequestProto); case CLOSELOG: return processCloseLog(logServiceRequestProto); - case DELETELOG: - return processDeleteLog(logServiceRequestProto); +// case DELETELOG: +// return processDeleteLog(logServiceRequestProto); case APPENDREQUEST: return processAppendRequest(trx, logServiceRequestProto); case SYNCREQUEST: @@ -366,17 +349,17 @@ public class LogStateMachine extends BaseStateMachine { } } - private CompletableFuture<Message> - processDeleteLog(LogServiceRequestProto logServiceRequestProto) { - DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog(); - LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName()); - try (final AutoCloseableLock writeLock = writeLock()) { - state.remove(logName); - } - // TODO need to handle exceptions while operating with files. - return CompletableFuture.completedFuture(Message - .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString())); - } +// private CompletableFuture<Message> +// processDeleteLog(LogServiceRequestProto logServiceRequestProto) { +// DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog(); +// LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName()); +// try (final AutoCloseableLock writeLock = writeLock()) { +// state.remove(logName); +// } +// // TODO need to handle exceptions while operating with files. +// return CompletableFuture.completedFuture(Message +// .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString())); +// } private CompletableFuture<Message> processCloseLog(LogServiceRequestProto logServiceRequestProto) { CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog(); @@ -387,14 +370,14 @@ public class LogStateMachine extends BaseStateMachine { .valueOf(CloseLogReplyProto.newBuilder().build().toByteString())); } - private CompletableFuture<Message> - processArchiveLog(LogServiceRequestProto logServiceRequestProto) { - ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog(); - LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName()); - // Handle log archiving. - return CompletableFuture.completedFuture(Message - .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString())); - } +// private CompletableFuture<Message> +// processArchiveLog(LogServiceRequestProto logServiceRequestProto) { +// ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog(); +// LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName()); +// // Handle log archiving. +// return CompletableFuture.completedFuture(Message +// .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString())); +// } private CompletableFuture<Message> processGetStateRequest( LogServiceRequestProto logServiceRequestProto) { @@ -404,48 +387,48 @@ public class LogStateMachine extends BaseStateMachine { .toGetStateReplyProto(state.containsKey(logName)).toByteString())); } - private CompletableFuture<Message> processCreateLogRequest( - LogServiceRequestProto logServiceRequestProto) { - Long val; - LogName name; - try (final AutoCloseableLock writeLock = writeLock()) { - CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog(); - name = LogServiceProtoUtil.toLogName(createLog.getLogName()); - val = state.get(name); - if (val == null) { - val = new Long(0); - } - state.put(name, val); - } - //TODO This can't be part of a state machine (REMOVE) - return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil - .toCreateLogReplyProto( - new LogStreamImpl(name, null, new LogServiceConfiguration())).toByteString())); - } - - //TODO REMOVE this code - private CompletableFuture<Message> processListLogsRequest() { - List<LogStream> logStreams = new ArrayList<LogStream>(state.size()); - for (Entry<LogName, Long> e : state.entrySet()) { - logStreams.add(new LogStreamImpl(e.getKey(), null, new LogServiceConfiguration())); - } - return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil - .toListLogLogsReplyProto(logStreams).toByteString())); - } +// private CompletableFuture<Message> processCreateLogRequest( +// LogServiceRequestProto logServiceRequestProto) { +// Long val; +// LogName name; +// try (final AutoCloseableLock writeLock = writeLock()) { +// CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog(); +// name = LogServiceProtoUtil.toLogName(createLog.getLogName()); +// val = state.get(name); +// if (val == null) { +// val = new Long(0); +// } +// state.put(name, val); +// } +// //TODO This can't be part of a state machine (REMOVE) +// return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil +// .toCreateLogReplyProto( +// new LogStreamImpl(name, null, new LogServiceConfiguration())).toByteString())); +// } + +// //TODO REMOVE this code +// private CompletableFuture<Message> processListLogsRequest() { +// List<LogStream> logStreams = new ArrayList<LogStream>(state.size()); +// for (Entry<LogName, Long> e : state.entrySet()) { +// logStreams.add(new LogStreamImpl(e.getKey(), null, new LogServiceConfiguration())); +// } +// return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil +// .toListLogLogsReplyProto(logStreams).toByteString())); +// } //TODO REMOVE this code - private CompletableFuture<Message> processGetLogRequest( - LogServiceRequestProto logServiceRequestProto) { - GetLogRequestProto getLog = logServiceRequestProto.getGetLog(); - LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName()); - if (state.containsKey(logName)) { - return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil - .toGetLogReplyProto(new LogStreamImpl(logName, null, new LogServiceConfiguration())) - .toByteString())); - } else { - return CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder() - .build().toByteString())); - } - } +// private CompletableFuture<Message> processGetLogRequest( +// LogServiceRequestProto logServiceRequestProto) { +// GetLogRequestProto getLog = logServiceRequestProto.getGetLog(); +// LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName()); +// if (state.containsKey(logName)) { +// return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil +// .toGetLogReplyProto(new LogStreamImpl(logName, null, new LogServiceConfiguration())) +// .toByteString())); +// } else { +// return CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder() +// .build().toByteString())); +// } +// } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java new file mode 100644 index 0000000..8ceaafb --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.client; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.logservice.LogServiceFactory; +import org.apache.ratis.logservice.api.LogInfo; +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.common.Constants; +import org.apache.ratis.logservice.proto.MetaServiceProtos.*; +import org.apache.ratis.logservice.util.MetaServiceProtoUtil; +import org.apache.ratis.protocol.*; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum; + + +/** + * LogServiceClient is responsible for all meta service communications such as create/get/list logs. + * Initialized by the metaQuorum string that has list of masters as "server:port' separated by a comma. + * An example: 'server1.example.com:9999,server2.example.com:9999,server3.example.com:9999 + */ + +public class LogServiceClient implements AutoCloseable { + + + // the raft client for meta quorum. All DML operations are going using this client. + final private RaftClient client; + + + /** + * Constuctor. Build raft client for meta quorum + * @param metaQuorum + */ + public LogServiceClient(String metaQuorum) { + Set<RaftPeer> peers = getPeersFromQuorum(metaQuorum); + RaftProperties properties = new RaftProperties(); + RaftGroup meta = RaftGroup.valueOf(Constants.metaGroupID, peers); + client = RaftClient.newBuilder() + .setRaftGroup(meta) + .setClientId(ClientId.randomId()) + .setProperties(properties) + .build(); + } + + /** + * Create a new Log request. + * @param logName the name of the log to create + * @return + * @throws IOException + */ + public LogService createLog(LogName logName) throws IOException { + RaftClientReply reply = client.sendReadOnly( + () -> MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString()); + CreateLogReplyProto message = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); + if (message.hasException()) { + throw MetaServiceProtoUtil.toMetaServiceException(message.getException()); + } + LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog()); + return LogServiceFactory.getInstance().createLogService(getRaftClient(info), null); + } + + /** + * Get log request. + * @param logName the name of the log to get + * @return + * @throws IOException + */ + public LogService getLog(LogName logName) throws IOException { + RaftClientReply reply = client.sendReadOnly + (() -> MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString()); + GetLogReplyProto message = GetLogReplyProto.parseFrom(reply.getMessage().getContent()); + if(message.hasException()) { + throw MetaServiceProtoUtil.toMetaServiceException(message.getException()); + } + LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog()); + return LogServiceFactory.getInstance().createLogService(getRaftClient(info), null); + } + + + public void deleteLog(LogName logName) throws IOException { + RaftClientReply reply = client.sendReadOnly + (() -> MetaServiceProtoUtil.toDeleteLogRequestProto(logName).toByteString()); + DeleteLogReplyProto message = DeleteLogReplyProto.parseFrom(reply.getMessage().getContent()); + if(message.hasException()) { + throw MetaServiceProtoUtil.toMetaServiceException(message.getException()); + } + } + + /** + * Return the list of available logs + * @return + * @throws IOException + */ + public List<LogInfo> listLogs() throws IOException { + RaftClientReply reply = client.sendReadOnly + (() -> MetaServiceProtoUtil.toListLogRequestProto().toByteString()); + ListLogsReplyProto message = ListLogsReplyProto.parseFrom(reply.getMessage().getContent()); + List<LogInfoProto> infoProtos = message.getLogsList(); + List<LogInfo> infos = infoProtos.stream() + .map(proto -> MetaServiceProtoUtil.toLogInfo(proto)) + .collect(Collectors.toList()); + return infos; + } + + @Override + public void close() throws Exception { + client.close(); + } + + // Internal methods + + /** + * Build a raft client for the particular log. Temporary here. TODO: Should be moved to LogService part + * @param logInfo + * @return + */ + private RaftClient getRaftClient(LogInfo logInfo) { + RaftProperties properties = new RaftProperties(); + return RaftClient.newBuilder().setRaftGroup(logInfo.getRaftGroup()).setProperties(properties).build(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java new file mode 100644 index 0000000..ba50154 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.common; + +import org.apache.ratis.protocol.RaftGroupId; + +import java.util.UUID; + +public class Constants { + + final public static RaftGroupId metaGroupID = RaftGroupId.valueOf(new UUID(0,1)); + + final public static RaftGroupId serversGroupID = RaftGroupId.valueOf(new UUID(0,2)); + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java new file mode 100644 index 0000000..74a77ba --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.common; + +import java.io.IOException; + +public class LogAlreadyExistException extends IOException { + public LogAlreadyExistException(String name) { + super(name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java new file mode 100644 index 0000000..f9d672a --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.common; + +import java.io.IOException; + +public class LogNotFoundException extends IOException { + + public LogNotFoundException(String logName) { + super(logName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java new file mode 100644 index 0000000..163b771 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.common; + +import java.io.IOException; + +public class NoEnoughWorkersException extends IOException { + + public NoEnoughWorkersException(String message) { + super(message); + } + + public NoEnoughWorkersException(int available) { + this("No enough Workers to create a new Log. Currently available workers: " + available); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java index ae4f2de..3e1ea4d 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java @@ -26,9 +26,8 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.logservice.api.LogReader; import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.proto.LogServiceProtos.*; import org.apache.ratis.logservice.util.LogServiceProtoUtil; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException; -import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java index 681a4ab..613ec5e 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java @@ -28,16 +28,8 @@ import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogStream.State; import org.apache.ratis.logservice.api.RecordListener; +import org.apache.ratis.logservice.proto.LogServiceProtos.*; import org.apache.ratis.logservice.util.LogServiceProtoUtil; -import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; @@ -53,32 +45,33 @@ public class LogServiceImpl implements LogService { @Override public LogStream createLog(LogName name) throws IOException { - RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) - .toByteString())); - CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); - return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); +// RaftClientReply reply = +// raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) +// .toByteString())); +// CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); +// return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); + return new LogStreamImpl(name, this); } - - @Override public LogStream getLog(LogName name) throws IOException { - RaftClientReply reply = - raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name) - .toByteString())); - GetLogReplyProto parseFrom = GetLogReplyProto.parseFrom(reply.getMessage().getContent()); - return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); +// RaftClientReply reply = +// raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name) +// .toByteString())); +// GetLogReplyProto parseFrom = GetLogReplyProto.parseFrom(reply.getMessage().getContent()); +// return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); + return null; } @Override public Iterator<LogStream> listLogs() throws IOException { - RaftClientReply reply = - raftClient - .sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString())); - ListLogsReplyProto parseFrom = ListLogsReplyProto.parseFrom(reply.getMessage().getContent()); - List<LogStreamProto> logStremsList = parseFrom.getLogStremsList(); - return LogServiceProtoUtil.toListLogStreams(logStremsList, this).iterator(); +// RaftClientReply reply = +// raftClient +// .sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString())); +// ListLogsReplyProto parseFrom = ListLogsReplyProto.parseFrom(reply.getMessage().getContent()); +// List<LogStreamProto> logStremsList = parseFrom.getLogStremsList(); +// return LogServiceProtoUtil.toListLogStreams(logStremsList, this).iterator(); + return null; } @Override @@ -100,19 +93,19 @@ public class LogServiceImpl implements LogService { @Override public void archiveLog(LogName name) throws IOException { - RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name) - .toByteString())); - ArchiveLogReplyProto parseFrom = - ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent()); +// RaftClientReply reply = +// raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name) +// .toByteString())); +// ArchiveLogReplyProto parseFrom = +// ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent()); } @Override public void deleteLog(LogName name) throws IOException { - RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name) - .toByteString())); - DeleteLogReplyProto parseFrom = DeleteLogReplyProto.parseFrom(reply.getMessage().getContent()); +// RaftClientReply reply = +// raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name) +// .toByteString())); +// DeleteLogReplyProto parseFrom = DeleteLogReplyProto.parseFrom(reply.getMessage().getContent()); } @@ -137,11 +130,12 @@ public class LogServiceImpl implements LogService { @Override public LogStream createLog(LogName name, LogServiceConfiguration config) throws IOException { //TODO configuration - RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) - .toByteString())); - CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); - return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); +// RaftClientReply reply = +// raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) +// .toByteString())); +// CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); +// return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); + return null; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java index 06a28f5..2c7feac 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java @@ -29,7 +29,7 @@ import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogWriter; import org.apache.ratis.logservice.api.RecordListener; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; +import org.apache.ratis.logservice.proto.LogServiceProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +62,7 @@ public class LogStreamImpl implements LogStream { */ long length; - public LogStreamImpl(LogStreamProto proto, LogService service) { + public LogStreamImpl(LogServiceProtos.LogStreamProto proto, LogService service) { this.service = service; this.name = LogName.of(proto.getLogName().getName()); this.config = service.getConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java index 92082ab..da19e70 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java @@ -21,15 +21,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.ratis.client.RaftClient; import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogWriter; import org.apache.ratis.logservice.util.LogServiceProtoUtil; -import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException; -import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto; +import org.apache.ratis.logservice.proto.LogServiceProtos.*; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.slf4j.Logger; @@ -62,9 +61,16 @@ public class LogWriterImpl implements LogWriter { List<ByteBuffer> list = new ArrayList<ByteBuffer>(); list.add(data); RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil - .toAppendBBEntryLogRequestProto(parent.getName(), list) - .toByteString())); + null; + try { + reply = raftClient.sendAsync(Message.valueOf(LogServiceProtoUtil + .toAppendBBEntryLogRequestProto(parent.getName(), list) + .toByteString())).get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } AppendLogEntryReplyProto proto = AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent()); if (proto.hasException()) { LogServiceException e = proto.getException(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java new file mode 100644 index 0000000..2b018bb --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.server; + +import org.apache.ratis.statemachine.impl.BaseStateMachine; + + +/** + * This is the statemachine for the default group serversGroupID. At the moment it's empty, but would + * have some logic related to the heartbeat functionality. + */ +public class ManagementStateMachine extends BaseStateMachine { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java new file mode 100644 index 0000000..a0157c3 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.server; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.logservice.common.Constants; +import org.apache.ratis.logservice.util.LogServiceUtils; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.*; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.LifeCycle; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.*; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Set; + +import static org.apache.ratis.logservice.common.Constants.metaGroupID; +import static org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum; + +/** + * Master quorum is responsible for tracking all available quorum members + */ +public class MasterServer implements Closeable { + + + // RaftServer internal server. Has meta raft group and MetaStateMachine + private RaftServer server; + + private String id; + + private String host; + + @Parameter(names = "-port", description = "Port number") + private int port = 9999; + + private String workingDir = null; + + private StateMachine metaStateMachine; + + private LifeCycle lifeCycle; + + public MasterServer(String hostname, int port, String workingDir) { + this.port = port; + this.host = hostname; + this.workingDir = workingDir; + id = host + "_" + port; + this.lifeCycle = new LifeCycle(this.id); + + } + + public MasterServer() { + + } + + public void start(String metaGroupId) throws IOException { + if (host == null) { + host = LogServiceUtils.getHostName(); + } + this.lifeCycle = new LifeCycle(this.id); + RaftProperties properties = new RaftProperties(); + if(workingDir != null) { + RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(workingDir))); + } + GrpcConfigKeys.Server.setPort(properties, port); + NettyConfigKeys.Server.setPort(properties, port); + Set<RaftPeer> peers = getPeersFromQuorum(metaGroupId); + RaftGroup metaGroup = RaftGroup.valueOf(Constants.metaGroupID, peers); + metaStateMachine = new MetaStateMachine(); + server = RaftServer.newBuilder() + .setGroup(metaGroup) + .setServerId(RaftPeerId.valueOf(id)) + .setStateMachineRegistry(raftGroupId -> { + if(raftGroupId.equals(metaGroupID)) { + return metaStateMachine; + } + return null; + }) + .setProperties(properties).build(); + lifeCycle.startAndTransition(() -> { + server.start(); + }, IOException.class); + } + + public static void main(String[] args) throws IOException { + MasterServer master = new MasterServer(); + JCommander.newBuilder() + .addObject(master) + .build() + .parse(args); + master.start(null); + + + } + public static MasterServer.Builder newBuilder() { + return new MasterServer.Builder(); + } + + @Override + public void close() throws IOException { + server.close(); + } + + public String getId() { + return id; + } + + public String getAddress() { + return host + ":" + port; + } + + public void cleanUp() throws IOException { + FileUtils.deleteFully(new File(workingDir)); + } + + public static class Builder { + private String host = null; + private int port = 9999; + private String workingDir = null; + + /** + * @return a {@link MasterServer} object. + */ + public MasterServer build() { + if (host == null) { + host = LogServiceUtils.getHostName(); + } + return new MasterServer(host, port, workingDir); + } + + /** + * Set the server hostname. + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Set server port + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + public Builder setWorkingDir(String workingDir) { + this.workingDir = workingDir; + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java new file mode 100644 index 0000000..ab614d9 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.server; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogInfo; +import org.apache.ratis.logservice.common.LogAlreadyExistException; +import org.apache.ratis.logservice.common.LogNotFoundException; +import org.apache.ratis.logservice.common.NoEnoughWorkersException; +import org.apache.ratis.logservice.proto.MetaServiceProtos; +import org.apache.ratis.logservice.proto.MetaServiceProtos.*; +import org.apache.ratis.logservice.util.LogServiceProtoUtil; +import org.apache.ratis.logservice.util.MetaServiceProtoUtil; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.*; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.util.AutoCloseableLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import static org.apache.ratis.logservice.common.Constants.metaGroupID; +import static org.apache.ratis.logservice.common.Constants.serversGroupID; + +/** + * State Machine serving meta data for LogService. It persists the pairs 'log name' -> RaftGroup + * During the start basing on the persisted data it would be able to build a list of the existing servers. + * Requests from clients for DDL operations are handled by query mechanism (so only the leader accept that. + * It performs the operation (such as Log creation) and sends a message with the log -> group pair to itself + * to persis this data internally and on followers. + */ + +public class MetaStateMachine extends BaseStateMachine { + + Logger LOG = LoggerFactory.getLogger(MetaStateMachine.class); + + + //Persisted map between log and RaftGroup + private Map<LogName, RaftGroup> map = new ConcurrentHashMap<>(); + // List of the currently known peers. + private final Set<RaftPeer> peers = new HashSet(); + + // keep a copy of raftServer to get group information. + private RaftServer raftServer; + + + private RaftGroup currentGroup = null; + + // MinHeap queue for load balancing groups across the peers + private PriorityBlockingQueue<PeerGroups> avail = new PriorityBlockingQueue<PeerGroups>(); + + //Properties + private RaftProperties properties = new RaftProperties(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + + @Override + public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage storage) throws IOException { + this.raftServer = server; + super.initialize(server, groupId, storage); + } + + @Override + public TransactionContext applyTransactionSerial(TransactionContext trx) { + RaftProtos.LogEntryProto x = trx.getLogEntry(); + MetaSMRequestProto req = null; + try { + req = MetaSMRequestProto.parseFrom(x.getStateMachineLogEntry().getLogData()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + switch (req.getTypeCase()) { + case REGISTERREQUEST: + LogServiceRegisterLogRequestProto r = req.getRegisterRequest(); + LogName logname = LogServiceProtoUtil.toLogName(r.getLogname()); + RaftGroup rg = MetaServiceProtoUtil.toRaftGroup(r.getRaftGroup()); + map.put(logname, rg); + LOG.info("Log {} registered at {} with group {} ", logname, getId(), rg ); + break; + case UNREGISTERREQUEST: + LogServiceUnregisterLogRequestProto unregReq = req.getUnregisterRequest(); + logname = LogServiceProtoUtil.toLogName(unregReq.getLogname()); + map.remove(logname); + break; + case PINGREQUEST: + LogServicePingRequestProto pingRequest = req.getPingRequest(); + RaftPeer peer = MetaServiceProtoUtil.toRaftPeer(pingRequest.getPeer()); + if (peers.contains(peer)) { + //Do Nothing, that's just heartbeat + } else { + peers.add(peer); + avail.add(new PeerGroups(peer)); + } + break; + + default: + } + return super.applyTransactionSerial(trx); + } + + @Override + public TransactionContext startTransaction(RaftClientRequest request) throws IOException { + return super.startTransaction(request); + } + + @Override + public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException { + return super.preAppendTransaction(trx); + } + + @Override + public CompletableFuture<Message> queryStale(Message request, long minIndex) { + return super.queryStale(request, minIndex); + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + return super.applyTransaction(trx); + } + + @Override + public CompletableFuture<Message> query(Message request) { + if (currentGroup == null) { + try { + List<RaftGroup> x = StreamSupport.stream(raftServer.getGroups().spliterator(), false).filter(group -> group.getGroupId().equals(metaGroupID)).collect(Collectors.toList()); + if (x.size() == 1) { + currentGroup = x.get(0); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + RaftProperties properties = new RaftProperties(); + MetaServiceProtos.MetaServiceRequestProto req = null; + try { + req = MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + MetaServiceProtos.MetaServiceRequestProto.TypeCase type = req.getTypeCase(); + switch (type) { + + case CREATELOG: + return processCreateLogRequest(req); + case LISTLOGS: + return processListLogsRequest(); + case GETLOG: + return processGetLogRequest(req); + case ARCHIVELOG: + return processArchiveLog(req); + case DELETELOG: + return processDeleteLog(req); + default: + } + CompletableFuture<Message> reply = super.query(request); + return reply; + } + + + + private CompletableFuture<Message> + processDeleteLog(MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) { + DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog(); + LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName()); + RaftGroup raftGroup = map.get(logName); + if (raftGroup == null) { + return CompletableFuture.completedFuture(Message.valueOf( + MetaServiceProtoUtil.toDeleteLogExceptionReplyProto( + new LogNotFoundException(logName.getName())).build().toByteString())); + } else { + Collection<RaftPeer> peers = raftGroup.getPeers(); + peers.stream().forEach(peer -> { + RaftClient client = RaftClient.newBuilder() + .setProperties(properties) + .setClientId(ClientId.randomId()) + .setRaftGroup(RaftGroup.valueOf(serversGroupID, peer)) + .build(); + try { + client.groupRemove(raftGroup.getGroupId(), true, peer.getId()); + } catch (IOException e) { + e.printStackTrace(); + } + }); + RaftClient client = RaftClient.newBuilder() + .setRaftGroup(currentGroup) + .setClientId(ClientId.randomId()) + .setProperties(properties) + .build(); + try { + client.send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder() + .setUnregisterRequest( + LogServiceUnregisterLogRequestProto.newBuilder() + .setLogname(LogServiceProtoUtil.toLogNameProto(logName))) + .build().toByteString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + return CompletableFuture.completedFuture(Message.valueOf( + MetaServiceProtoUtil.toDeleteLogReplyProto().toByteString())); + } + +// private CompletableFuture<Message> processCloseLog(MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) { +// CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog(); +// LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName()); +// // Need to check whether the file is opened if opened close it. +// // TODO need to handle exceptions while operating with files. +// return CompletableFuture.completedFuture(Message +// .valueOf(CloseLogReplyProto.newBuilder().build().toByteString())); +// } + + private CompletableFuture<Message> + processArchiveLog(MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) { + ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog(); + LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName()); + // Handle log archiving. + return CompletableFuture.completedFuture(Message + .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString())); + } + +// private CompletableFuture<Message> processGetStateRequest( +// MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) { +// MetaServiceProtos.GetStateRequestProto getState = logServiceRequestProto.getGetState(); +// LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName()); +// return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil +// .toGetStateReplyProto(true).toByteString())); +// } +// + private CompletableFuture<Message> processCreateLogRequest( + MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) { + LogName name; + try (final AutoCloseableLock writeLock = writeLock()) { + CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog(); + name = LogServiceProtoUtil.toLogName(createLog.getLogName()); + if(map.containsKey(name)) { + return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil + .toCreateLogExceptionReplyProto(new LogAlreadyExistException(name.getName())) + .build() + .toByteString())); + } + // Check that we have at least 3 nodes + if (avail.size() < 3) { + return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil + .toCreateLogExceptionReplyProto(new NoEnoughWorkersException(avail.size())) + .build() + .toByteString())); + } else { + List<PeerGroups> peerGroup = IntStream.range(0, 3).mapToObj(i -> avail.poll()).collect(Collectors.toList()); + List<RaftPeer> peers = peerGroup.stream().map(obj -> obj.getPeer()).collect(Collectors.toList()); + RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers); + peerGroup.stream().forEach(pg -> { + pg.getGroups().add(raftGroup); + avail.add(pg); + }); + peers.forEach(i -> { + RaftClient client = RaftClient.newBuilder().setProperties(properties).setRaftGroup(RaftGroup.valueOf(serversGroupID, i)).build(); + try { + client.groupAdd(raftGroup, i.getId()); + } catch (IOException e) { + e.printStackTrace(); + } + }); + RaftClient client = RaftClient.newBuilder() + .setRaftGroup(currentGroup) + .setClientId(ClientId.randomId()) + .setProperties(properties) + .build(); + try { + client.send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder() + .setRegisterRequest(LogServiceRegisterLogRequestProto + .newBuilder() + .setLogname(LogServiceProtoUtil.toLogNameProto(name)) + .setRaftGroup(MetaServiceProtoUtil + .toRaftGroupProto(raftGroup))) + .build().toByteString()); + } catch (IOException e) { + e.printStackTrace(); + } + return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil + .toCreateLogReplyProto(new LogInfo((name), raftGroup)).build().toByteString())); + } + } + } + + + private AutoCloseableLock writeLock() { + return AutoCloseableLock.acquire(lock.writeLock()); + } + + private CompletableFuture<Message> processListLogsRequest() { + return CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil + .toListLogLogsReplyProto( + map.entrySet() + .stream() + .map(log -> new LogInfo(log.getKey(), log.getValue())) + .collect(Collectors.toList())).toByteString())); + } + + private CompletableFuture<Message> processGetLogRequest( + MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) { + MetaServiceProtos.GetLogRequestProto getLog = logServiceRequestProto.getGetLog(); + LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName()); + RaftGroup raftGroup = map.get(logName); + if (raftGroup != null) { + return CompletableFuture.completedFuture(Message.valueOf( + MetaServiceProtoUtil.toGetLogReplyProto(new LogInfo(logName, raftGroup)) + .toByteString())); + } else { + return CompletableFuture.completedFuture(Message.valueOf( + MetaServiceProtoUtil.toGetLogExceptionReplyProto( + new LogNotFoundException(logName.getName())).build().toByteString())); + } + } + + + class PeerGroups implements Comparable{ + RaftPeer peer; + Set<RaftGroup> groups = new HashSet<>(); + + public PeerGroups(RaftPeer peer) { + this.peer = peer; + + } + + public Set<RaftGroup> getGroups () { + return groups; + } + + public RaftPeer getPeer() { + return peer; + } + + @Override + public int compareTo(Object o) { + return groups.size() - ((PeerGroups) o).groups.size(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java index 2e1b8da..59037b8 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ratis.logservice.util; import java.nio.ByteBuffer; @@ -27,131 +28,32 @@ import org.apache.ratis.logservice.api.LogService; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogStream.State; import org.apache.ratis.logservice.impl.LogStreamImpl; -import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto.Builder; -import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState; -import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto; -import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogRequestProto; +import org.apache.ratis.logservice.proto.LogServiceProtos; +import org.apache.ratis.logservice.proto.LogServiceProtos.*; +import org.apache.ratis.logservice.proto.MetaServiceProtos.*; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ public class LogServiceProtoUtil { - public static LogServiceRequestProto toCreateLogRequestProto(LogName logName) { - LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); - CreateLogRequestProto createLog = - CreateLogRequestProto.newBuilder().setLogName(logNameProto).build(); - return LogServiceRequestProto.newBuilder().setCreateLog(createLog).build(); - } - - public static LogServiceRequestProto toListLogRequestProto() { - ListLogsRequestProto listLogs = ListLogsRequestProto.newBuilder().build(); - return LogServiceRequestProto.newBuilder().setListLogs(listLogs).build(); - } - - public static LogServiceRequestProto toGetLogRequestProto(LogName name) { - GetLogRequestProto getLog = - GetLogRequestProto.newBuilder().setLogName(toLogNameProto(name)).build(); - return LogServiceRequestProto.newBuilder().setGetLog(getLog).build(); - } - - public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) { - LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); - CloseLogRequestProto closeLog = - CloseLogRequestProto.newBuilder().setLogName(logNameProto).build(); - return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build(); - } - - public static CloseLogReplyProto toCloseLogReplyProto() { - CloseLogReplyProto.Builder builder = CloseLogReplyProto.newBuilder(); - return builder.build(); - } - - public static LogServiceRequestProto toGetStateRequestProto(LogName logName) { - LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); - GetStateRequestProto getState = - GetStateRequestProto.newBuilder().setLogName(logNameProto).build(); - return LogServiceRequestProto.newBuilder().setGetState(getState).build(); - } - - public static LogServiceRequestProto toArchiveLogRequestProto(LogName logName) { - LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); - ArchiveLogRequestProto archiveLog = - ArchiveLogRequestProto.newBuilder().setLogName(logNameProto).build(); - return LogServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build(); - } - - public static LogServiceRequestProto toDeleteLogRequestProto(LogName logName) { - LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); - DeleteLogRequestProto deleteLog = - DeleteLogRequestProto.newBuilder().setLogName(logNameProto).build(); - return LogServiceRequestProto.newBuilder().setDeleteLog(deleteLog).build(); - } - - public static DeleteLogReplyProto toDeleteLogReplyProto() { - DeleteLogReplyProto.Builder builder = DeleteLogReplyProto.newBuilder(); - return builder.build(); - } public static LogNameProto toLogNameProto(LogName logName) { return LogNameProto.newBuilder().setName(logName.getName()).build(); } - public static LogName toLogName(LogNameProto logNameProto) { + public static LogName toLogName(LogServiceProtos.LogNameProto logNameProto) { return LogName.of(logNameProto.getName()); } public static LogStreamProto toLogStreamProto(LogStream logStream) { LogNameProto logNameProto = - LogNameProto.newBuilder().setName(logStream.getName().getName()).build(); + LogNameProto.newBuilder().setName(logStream.getName().getName()).build(); LogStreamProto logStreamProto = - LogStreamProto - .newBuilder() - .setLogName(logNameProto) - .setSize(logStream.getSize()) - .setState( - logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : LogStreamState.CLOSED) - .build(); + LogStreamProto + .newBuilder() + .setLogName(logNameProto) + .setSize(logStream.getSize()) + .setState( + logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : LogStreamState.CLOSED) + .build(); return logStreamProto; } @@ -159,26 +61,19 @@ public class LogServiceProtoUtil { return new LogStreamImpl(logStream, parent); } - public static CreateLogReplyProto toCreateLogReplyProto(LogStream logStream) { - LogNameProto logNameProto = - LogNameProto.newBuilder().setName(logStream.getName().getName()).build(); - LogStreamProto logStreamProto = - LogStreamProto - .newBuilder() - .setLogName(logNameProto) - .setSize(logStream.getSize()) - .setState( - logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : LogStreamState.CLOSED) - .build(); - return CreateLogReplyProto.newBuilder().setLogStream(logStreamProto).build(); + + public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + CloseLogRequestProto closeLog = + CloseLogRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build(); } - public static ListLogsReplyProto toListLogLogsReplyProto(List<LogStream> logStreams) { - Builder newBuilder = ListLogsReplyProto.newBuilder(); - for (LogStream stream : logStreams) { - newBuilder.addLogStrems(toLogStreamProto(stream)); - } - return newBuilder.build(); + public static LogServiceRequestProto toGetStateRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + GetStateRequestProto getState = + GetStateRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setGetState(getState).build(); } public static ArchiveLogReplyProto toArchiveLogReplyProto() { @@ -261,13 +156,8 @@ public class LogServiceProtoUtil { return retVal; } - public static GetLogReplyProto toGetLogReplyProto(LogStream logStream) { - return GetLogReplyProto.newBuilder().setLogStream(toLogStreamProto(logStream)).build(); - } - public static GetStateReplyProto toGetStateReplyProto(boolean exists) { - return GetStateReplyProto.newBuilder() - .setState(exists ? LogStreamState.OPEN : LogStreamState.CLOSED).build(); + return GetStateReplyProto.newBuilder().build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java new file mode 100644 index 0000000..a25a7df --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.util; + +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; + +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LogServiceUtils { + + public static Set<RaftPeer> getPeersFromIds(String identity) { + return Stream.of(identity.split(",")).map(elem -> + new RaftPeer(RaftPeerId.valueOf(elem), elem.replace('_', ':')) + ).collect(Collectors.toSet()); + } + + public static Set<RaftPeer> getPeersFromQuorum(String identity) { + return Stream.of(identity.split(",")).map(elem -> + new RaftPeer(RaftPeerId.valueOf(elem.replace(':', '_')), elem) + ).collect(Collectors.toSet()); + } + + public static String getHostName() { + try (DatagramSocket socket = new DatagramSocket()) { + socket.connect(InetAddress.getByName("8.8.8.8"), 10002); + return socket.getLocalAddress().getHostName(); + } catch (Exception e) { + return "localhost"; + } + + } +}
