Murtadha Hubail has submitted this change and it was merged. Change subject: ASTERIXDB-1425 & ASTERIXDB-1450: Fix LogReader random reads ......................................................................
ASTERIXDB-1425 & ASTERIXDB-1450: Fix LogReader random reads - Fix random reads for truncated logs (ASTERIXDB-1425). - Fix log file partition size boundary check (ASTERIXDB-1450). - Fix deadlock between LogReader and LogFlusher. - Prevent checkpoints from deleting log files being accessed by rollbacks. - Make rollbacks start from LSN = max(txnFirstLSN, minMemoryLSN). - Make default log partition size 256MB instead of 2GB. Change-Id: I1c75ca4a7c8fe197451126392389d4baecbd7e45 Reviewed-on: https://asterix-gerrit.ics.uci.edu/867 Tested-by: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java 6 files changed, 269 insertions(+), 55 deletions(-) Approvals: Ian Maxon: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java index c4a5e8e..356dad3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java @@ -32,7 +32,7 @@ private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = (128 << 10); // 128KB private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize"; - private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = ((long) 2 << 30); // 2GB + private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = StorageUtil.getSizeInBytes(256L, StorageUnit.MEGABYTE); private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold"; private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 20); // 64M diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java index cff4184..97d4897 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.transactions; import java.io.IOException; +import java.nio.channels.FileChannel; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.replication.IReplicationManager; @@ -85,4 +86,23 @@ */ public int getNumLogPages(); + /** + * Opens a file channel to the log file which contains {@code LSN}. + * The start position of the file channel will be at the first LSN of the file. + * + * @param LSN + * @return + * @throws IOException + * if the log file does not exist. + */ + public TxnLogFile getLogFile(long LSN) throws IOException; + + /** + * Closes the log file. + * + * @param logFileRef + * @param fileChannel + * @throws IOException + */ + public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java new file mode 100644 index 0000000..e535206 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class TxnLogFile { + + private final FileChannel fileChannel; + private final long logFileId; + private final long fileBeginLSN; + private final ILogManager logManager; + private boolean open = true; + + public TxnLogFile(ILogManager logManager, FileChannel fileChannel, long logFileId, long fileBeginLSN) { + this.logManager = logManager; + this.fileChannel = fileChannel; + this.logFileId = logFileId; + this.fileBeginLSN = fileBeginLSN; + } + + public void position(long newPosition) throws IOException { + fileChannel.position(newPosition); + } + + public long size() throws IOException { + return fileChannel.size(); + } + + public int read(ByteBuffer readBuffer) throws IOException { + return fileChannel.read(readBuffer); + } + + public long getLogFileId() { + return logFileId; + } + + public synchronized void close() throws IOException { + if (open) { + logManager.closeLogFile(this, fileChannel); + open = false; + } + } + + public long getFileBeginLSN() { + return fileBeginLSN; + } +} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index f10520b..be0435a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -47,6 +48,7 @@ import org.apache.asterix.common.transactions.LogManagerProperties; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; +import org.apache.asterix.common.transactions.TxnLogFile; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; @@ -74,6 +76,7 @@ private final String nodeId; protected LinkedBlockingQueue<ILogRecord> flushLogsQ; private final FlushLogsLogger flushLogsLogger; + private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>(); public LogManager(TransactionSubsystem txnSubsystem) { this.txnSubsystem = txnSubsystem; @@ -148,7 +151,13 @@ "Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record."); } } - if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) { + + /** + * To eliminate the case where the modulo of the next appendLSN = 0 (the next + * appendLSN = the first LSN of the next log file), we do not allow a log to be + * written at the last offset of the current file. + */ + if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() >= logFileSize) { prepareNextLogFile(); appendPage.isFull(true); getAndInitNewPage(); @@ -194,8 +203,28 @@ } protected void prepareNextLogFile() { + //wait until all log records have been flushed in the current file + synchronized (flushLSN) { + while (flushLSN.get() != appendLSN.get()) { + //notification will come from LogBuffer.internalFlush(.) + try { + flushLSN.wait(); + } catch (InterruptedException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe("Preparing new log file was interrupted"); + } + Thread.currentThread().interrupt(); + } + } + } + //move appendLSN and flushLSN to the first LSN of the next log file appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get())); + flushLSN.set(appendLSN.get()); appendChannel = getFileChannel(appendLSN.get(), true); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Created new txn log file with id(" + getLogFileId(appendLSN.get()) + ") starting with LSN = " + + appendLSN.get()); + } appendPage.isLastPage(true); //[Notice] //the current log file channel is closed if @@ -323,15 +352,32 @@ @Override public void deleteOldLogFiles(long checkpointLSN) { - Long checkpointLSNLogFileID = getLogFileId(checkpointLSN); List<Long> logFileIds = getLogFileIds(); if (logFileIds != null) { - for (Long id : logFileIds) { - if (id < checkpointLSNLogFileID) { + //sort log files from oldest to newest + Collections.sort(logFileIds); + /** + * At this point, any future LogReader should read from LSN >= checkpointLSN + */ + synchronized (txnLogFileId2ReaderCount) { + for (Long id : logFileIds) { + /** + * Stop deletion if: + * The log file which contains the checkpointLSN has been reached. + * The oldest log file being accessed by a LogReader has been reached. + */ + if (id >= checkpointLSNLogFileID + || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) { + break; + } + + //delete old log file File file = new File(getLogFilePath(id)); - if (!file.delete()) { - throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath()); + file.delete(); + txnLogFileId2ReaderCount.remove(id); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Deleted log file " + file.getAbsolutePath()); } } } @@ -365,6 +411,7 @@ throw new IllegalStateException("Failed to close a fileChannel of a log file"); } } + txnLogFileId2ReaderCount.clear(); List<Long> logFileIds = getLogFileIds(); if (logFileIds != null) { for (Long id : logFileIds) { @@ -434,7 +481,7 @@ return (new File(path)).mkdir(); } - public FileChannel getFileChannel(long lsn, boolean create) { + private FileChannel getFileChannel(long lsn, boolean create) { FileChannel newFileChannel = null; try { long fileId = getLogFileId(lsn); @@ -496,6 +543,55 @@ return numLogPages; } + @Override + public TxnLogFile getLogFile(long LSN) throws IOException { + long fileId = getLogFileId(LSN); + String logFilePath = getLogFilePath(fileId); + File file = new File(logFilePath); + if (!file.exists()) { + throw new IOException("Log file with id(" + fileId + ") was not found. Requested LSN: " + LSN); + } + RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r"); + FileChannel newFileChannel = raf.getChannel(); + TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize); + touchLogFile(fileId); + return logFile; + } + + @Override + public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException { + if (!fileChannel.isOpen()) { + throw new IllegalStateException("File channel is not open"); + } + fileChannel.close(); + untouchLogFile(logFileRef.getLogFileId()); + } + + private void touchLogFile(long fileId) { + synchronized (txnLogFileId2ReaderCount) { + if (txnLogFileId2ReaderCount.containsKey(fileId)) { + txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1); + } else { + txnLogFileId2ReaderCount.put(fileId, 1); + } + } + } + + private void untouchLogFile(long fileId) { + synchronized (txnLogFileId2ReaderCount) { + if (txnLogFileId2ReaderCount.containsKey(fileId)) { + int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1; + if (newReaderCount < 0) { + throw new IllegalStateException( + "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")"); + } + txnLogFileId2ReaderCount.put(fileId, newReaderCount); + } else { + throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened."); + } + } + } + /** * This class is used to log FLUSH logs. * FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java index 0b1d320..1592aba 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java @@ -20,15 +20,16 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.util.logging.Logger; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogReader; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.MutableLong; +import org.apache.asterix.common.transactions.TxnLogFile; /** * NOTE: Many method calls of this class are not thread safe. @@ -38,7 +39,7 @@ public static final boolean IS_DEBUG_MODE = false;//true private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName()); - private final LogManager logMgr; + private final ILogManager logMgr; private final long logFileSize; private final int logPageSize; private final MutableLong flushLSN; @@ -48,14 +49,15 @@ private long readLSN; private long bufferBeginLSN; private long fileBeginLSN; - private FileChannel fileChannel; + private TxnLogFile logFile; private enum ReturnState { FLUSH, EOF }; - public LogReader(LogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN, boolean isRecoveryMode) { + public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN, + boolean isRecoveryMode) { this.logMgr = logMgr; this.logFileSize = logFileSize; this.logPageSize = logPageSize; @@ -71,12 +73,13 @@ if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) { return; } - getFileChannel(); + getLogFile(); fillLogReadBuffer(); } /** * Get the next log record from the log file. + * * @return A deserialized log record, or null if we have reached the end of the file. * @throws ACIDException */ @@ -119,11 +122,14 @@ continue; } case BAD_CHKSUM: { - LOGGER.severe("Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early."); + LOGGER.severe( + "Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early."); return null; } case OK: break; + default: + throw new IllegalStateException("Unexpected log read status: " + status); } // break the loop by default @@ -136,14 +142,14 @@ private ReturnState waitForFlushOrReturnIfEOF() { synchronized (flushLSN) { - while (readLSN >= flushLSN.get()) { + while (readLSN > flushLSN.get()) { if (isRecoveryMode) { return ReturnState.EOF; } try { if (IS_DEBUG_MODE) { - LOGGER.info("waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " - + readLSN); + LOGGER.info( + "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN); } flushLSN.wait(); } catch (InterruptedException e) { @@ -156,15 +162,16 @@ /** * Continues log analysis between log file splits. + * * @return true if log continues, false if EOF * @throws ACIDException */ private boolean refillLogReadBuffer() throws ACIDException { try { - if (readLSN % logFileSize == fileChannel.size()) { - fileChannel.close(); + if (readLSN % logFileSize == logFile.size()) { + logFile.close(); readLSN += logFileSize - (readLSN % logFileSize); - getFileChannel(); + getLogFile(); } return fillLogReadBuffer(); } catch (IOException e) { @@ -174,6 +181,7 @@ /** * Fills the log buffer with data from the log file at the current position + * * @return false if EOF, true otherwise * @throws ACIDException */ @@ -183,17 +191,17 @@ } private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException { - int size=0; - int read=0; + int size = 0; + int read = 0; readBuffer.position(0); readBuffer.limit(readSize); try { - fileChannel.position(readLSN % logFileSize); + logFile.position(readLSN % logFileSize); //We loop here because read() may return 0, but this simply means we are waiting on IO. //Therefore we want to break out only when either the buffer is full, or we reach EOF. - while( size < readSize && read != -1) { - read = fileChannel.read(readBuffer); - if(read>0) { + while (size < readSize && read != -1) { + read = logFile.read(readBuffer); + if (read > 0) { size += read; } } @@ -202,7 +210,7 @@ } readBuffer.position(0); readBuffer.limit(size); - if(size == 0 && read == -1){ + if (size == 0 && read == -1) { return false; //EOF } bufferBeginLSN = readLSN; @@ -213,38 +221,37 @@ @Override public ILogRecord read(long LSN) throws ACIDException { readLSN = LSN; + //wait for the log to be flushed if needed before trying to read it. synchronized (flushLSN) { - while (readLSN >= flushLSN.get()) { + while (readLSN > flushLSN.get()) { try { flushLSN.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } try { - if (fileChannel == null) { - getFileChannel(); + if (logFile == null) { + //get the log file which contains readLSN + getLogFile(); fillLogReadBuffer(); - } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + fileChannel.size()) { - fileChannel.close(); - getFileChannel(); + } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) { + //log is not in the current log file + logFile.close(); + getLogFile(); fillLogReadBuffer(); } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) { + //log is not in the current read buffer fillLogReadBuffer(); } else { + //log is either completely in the current read buffer or truncated readBuffer.position((int) (readLSN - bufferBeginLSN)); } } catch (IOException e) { throw new ACIDException(e); } - boolean hasRemaining; - if(readBuffer.position() == readBuffer.limit()){ - hasRemaining = refillLogReadBuffer(); - if(!hasRemaining){ - throw new ACIDException("LSN is out of bounds"); - } - } + ByteBuffer readBuffer = this.readBuffer; while (true) { RecordReadStatus status = logRecord.readLogRecord(readBuffer); @@ -256,14 +263,20 @@ continue; } case TRUNCATED: { - throw new ACIDException("LSN is out of bounds"); + if (!fillLogReadBuffer()) { + throw new IllegalStateException( + "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId()); + } + //now read the complete log record + continue; } case BAD_CHKSUM: { throw new ACIDException("Log record has incorrect checksum"); } case OK: break; - + default: + throw new IllegalStateException("Unexpected log read status: " + status); } break; } @@ -272,16 +285,20 @@ return logRecord; } - private void getFileChannel() throws ACIDException { - fileChannel = logMgr.getFileChannel(readLSN, false); - fileBeginLSN = readLSN; + private void getLogFile() throws ACIDException { + try { + logFile = logMgr.getLogFile(readLSN); + fileBeginLSN = logFile.getFileBeginLSN(); + } catch (IOException e) { + throw new ACIDException(e); + } } @Override public void close() throws ACIDException { try { - if (fileChannel != null) { - fileChannel.close(); + if (logFile != null) { + logFile.close(); } } catch (IOException e) { throw new ACIDException(e); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java index 3e5c6cf..afb926b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java @@ -689,18 +689,34 @@ int abortedJobId = txnContext.getJobId().getId(); // Obtain the first/last log record LSNs written by the Job long firstLSN = txnContext.getFirstLSN(); + /** + * The effect of any log record with LSN below minFirstLSN has already been written to disk and + * will not be rolled back. Therefore, we will set the first LSN of the job to the maximum of + * minFirstLSN and the job's first LSN. + */ + try { + long localMinFirstLSN = getLocalMinFirstLSN(); + firstLSN = Math.max(firstLSN, localMinFirstLSN); + } catch (HyracksDataException e) { + throw new ACIDException(e); + } long lastLSN = txnContext.getLastLSN(); - - LOGGER.log(Level.INFO, "rollbacking transaction log records from " + firstLSN + " to " + lastLSN); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("rollbacking transaction log records from " + firstLSN + " to " + lastLSN); + } // check if the transaction actually wrote some logs. - if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) { - LOGGER.log(Level.INFO, - "no need to roll back as there were no operations by the transaction " + txnContext.getJobId()); + if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > lastLSN) { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info( + "no need to roll back as there were no operations by the transaction " + txnContext.getJobId()); + } return; } // While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns - LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); + } Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId, List<Long>>(); TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); @@ -812,7 +828,6 @@ LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" + entityCommitLogCount + "/" + undoCount); } - } finally { logReader.close(); } -- To view, visit https://asterix-gerrit.ics.uci.edu/867 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I1c75ca4a7c8fe197451126392389d4baecbd7e45 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Young-Seok Kim <[email protected]>
