JAMES-2082 Implement blob storage
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ee45bd4e Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ee45bd4e Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ee45bd4e Branch: refs/heads/master Commit: ee45bd4e3631125cc03c8527e8b2b63b8a6b343e Parents: 67aa191 Author: quynhn <qngu...@linagora.com> Authored: Mon Jul 3 18:07:55 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:55 2017 +0200 ---------------------------------------------------------------------- .../apache/james/mailbox/cassandra/BlobId.java | 73 ++++++++ .../apache/james/mailbox/cassandra/PartId.java | 72 ++++++++ .../cassandra/mail/CassandraBlobsDAO.java | 177 +++++++++++++++++++ .../cassandra/mail/utils/DataChunker.java | 52 ++++++ .../cassandra/modules/CassandraBlobModule.java | 64 +++++++ .../mailbox/cassandra/table/BlobTable.java | 33 ++++ .../table/CassandraMessageV2Table.java | 67 +++++++ .../james/mailbox/cassandra/ids/BlobIdTest.java | 83 +++++++++ .../james/mailbox/cassandra/ids/PartIdTest.java | 87 +++++++++ .../cassandra/mail/CassandraBlobsDAOTest.java | 126 +++++++++++++ .../cassandra/mail/utils/DataChunkerTest.java | 139 +++++++++++++++ 11 files changed, 973 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java new file mode 100644 index 0000000..50601b6 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java @@ -0,0 +1,73 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra; + +import org.apache.commons.codec.digest.DigestUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +public class BlobId { + public static BlobId forPayload(byte[] payload) { + Preconditions.checkArgument(payload != null); + return new BlobId(DigestUtils.sha1Hex(payload)); + } + + public static BlobId from(String id) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(id)); + return new BlobId(id); + } + + private final String id; + + @VisibleForTesting + BlobId(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + @Override + public final boolean equals(Object obj) { + if (obj instanceof BlobId) { + BlobId other = (BlobId) obj; + return Objects.equal(id, other.id); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hashCode(id); + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("id", id) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java new file mode 100644 index 0000000..421be7f --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java @@ -0,0 +1,72 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +public class PartId { + public static PartId create(BlobId blobId, int position) { + Preconditions.checkNotNull(blobId); + Preconditions.checkArgument(position >= 0, "Position needs to be positive"); + return new PartId(blobId.getId() + "-" + position); + } + + public static PartId from(String id) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(id)); + return new PartId(id); + } + + private final String id; + + @VisibleForTesting + PartId(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + @Override + public final boolean equals(Object obj) { + if (obj instanceof PartId) { + PartId other = (PartId) obj; + return Objects.equal(id, other.id); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hashCode(id); + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("id", id) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java new file mode 100644 index 0000000..0d20e88 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java @@ -0,0 +1,177 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.mailbox.cassandra.BlobId; +import org.apache.james.mailbox.cassandra.PartId; +import org.apache.james.mailbox.cassandra.mail.utils.DataChunker; +import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.util.FluentFutureStream; +import org.apache.james.util.OptionalConverter; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Bytes; + + +public class CassandraBlobsDAO { + + public static final int CHUNK_SIZE = 1024 * 100; + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final PreparedStatement insert; + private final PreparedStatement insertPart; + private final PreparedStatement select; + private final PreparedStatement selectPart; + private final DataChunker dataChunker; + + @Inject + public CassandraBlobsDAO(Session session) { + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.dataChunker = new DataChunker(); + this.insert = prepareInsert(session); + this.select = prepareSelect(session); + + this.insertPart = prepareInsertPart(session); + this.selectPart = prepareSelectPart(session); + } + + private PreparedStatement prepareSelect(Session session) { + return session.prepare(select() + .from(Blobs.TABLE_NAME) + .where(eq(Blobs.ID, bindMarker(Blobs.ID)))); + } + + private PreparedStatement prepareSelectPart(Session session) { + return session.prepare(select() + .from(BlobParts.TABLE_NAME) + .where(eq(BlobParts.ID, bindMarker(BlobParts.ID)))); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(Blobs.TABLE_NAME) + .value(Blobs.ID, bindMarker(Blobs.ID)) + .value(Blobs.POSITION, bindMarker(Blobs.POSITION)) + .value(Blobs.PART, bindMarker(Blobs.PART))); + } + + private PreparedStatement prepareInsertPart(Session session) { + return session.prepare(insertInto(BlobParts.TABLE_NAME) + .value(BlobParts.ID, bindMarker(BlobParts.ID)) + .value(BlobParts.DATA, bindMarker(BlobParts.DATA))); + } + + public CompletableFuture<Optional<BlobId>> save(byte[] data) { + if (data == null) { + return CompletableFuture.completedFuture(Optional.empty()); + } + BlobId blobId = BlobId.forPayload(data); + return saveBlobParts(data, blobId) + .thenCompose(partIds -> saveBlobPartsReferences(blobId, partIds)) + .thenApply(any -> Optional.of(blobId)); + } + + private CompletableFuture<Stream<Pair<Integer, PartId>>> saveBlobParts(byte[] data, BlobId blobId) { + return FluentFutureStream.of( + dataChunker.chunk(data, CHUNK_SIZE) + .map(pair -> writePart(pair.getRight(), blobId, pair.getKey()) + .thenApply(partId -> Pair.of(pair.getKey(), partId)))) + .completableFuture(); + } + + private CompletableFuture<PartId> writePart(ByteBuffer data, BlobId blobId, int position) { + PartId partId = PartId.create(blobId, position); + return cassandraAsyncExecutor.executeVoid( + insertPart.bind() + .setString(BlobParts.ID, partId.getId()) + .setBytes(BlobParts.DATA, data)) + .thenApply(any -> partId); + } + + private CompletableFuture<Stream<Void>> saveBlobPartsReferences(BlobId blobId, Stream<Pair<Integer, PartId>> stream) { + return FluentFutureStream.of(stream.map(pair -> + cassandraAsyncExecutor.executeVoid(insert.bind() + .setString(Blobs.ID, blobId.getId()) + .setLong(Blobs.POSITION, pair.getKey()) + .setString(Blobs.PART, pair.getValue().getId())))) + .completableFuture(); + } + + public CompletableFuture<byte[]> read(BlobId blobId) { + return cassandraAsyncExecutor.execute( + select.bind() + .setString(Blobs.ID, blobId.getId())) + .thenApply(this::toPartIds) + .thenCompose(this::toDataParts) + .thenApply(this::concatenateDataParts); + } + + private ImmutableMap<Long, PartId> toPartIds(ResultSet resultSet) { + return CassandraUtils.convertToStream(resultSet) + .map(row -> Pair.of(row.getLong(Blobs.POSITION), PartId.from(row.getString(Blobs.PART)))) + .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)); + } + + private CompletableFuture<Stream<Optional<Row>>> toDataParts(ImmutableMap<Long, PartId> positionToIds) { + return CompletableFutureUtil.chainAll( + positionToIds.values().stream(), + this::readPart); + } + + private byte[] concatenateDataParts(Stream<Optional<Row>> rows) { + ImmutableList<byte[]> parts = rows.flatMap(OptionalConverter::toStream) + .map(this::rowToData) + .collect(Guavate.toImmutableList()); + + return Bytes.concat(parts.toArray(new byte[parts.size()][])); + } + + private byte[] rowToData(Row row) { + byte[] data = new byte[row.getBytes(BlobParts.DATA).remaining()]; + row.getBytes(BlobParts.DATA).get(data); + return data; + } + + private CompletableFuture<Optional<Row>> readPart(PartId partId) { + return cassandraAsyncExecutor.executeSingleRow(selectPart.bind() + .setString(BlobParts.ID, partId.getId())); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java new file mode 100644 index 0000000..57e0bfc --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java @@ -0,0 +1,52 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.utils; + +import java.nio.ByteBuffer; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Preconditions; + +public class DataChunker { + + public Stream<Pair<Integer, ByteBuffer>> chunk(byte[] data, int chunkSize) { + Preconditions.checkNotNull(data); + Preconditions.checkArgument(chunkSize > 0, "ChunkSize can not be negative"); + + int size = data.length; + int fullChunkCount = size / chunkSize; + + return Stream.concat( + IntStream.range(0, fullChunkCount) + .mapToObj(i -> Pair.of(i, ByteBuffer.wrap(data, i * chunkSize, chunkSize))), + lastChunk(data, chunkSize * fullChunkCount, fullChunkCount)); + } + + private Stream<Pair<Integer, ByteBuffer>> lastChunk(byte[] data, int offset, int index) { + if (offset == data.length && index > 0) { + return Stream.empty(); + } + return Stream.of(Pair.of(index, ByteBuffer.wrap(data, offset, data.length - offset))); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java new file mode 100644 index 0000000..4eea870 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java @@ -0,0 +1,64 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.modules; + +import java.util.List; + +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.components.CassandraTable; +import org.apache.james.backends.cassandra.components.CassandraType; +import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts; +import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.google.common.collect.ImmutableList; + +public class CassandraBlobModule implements CassandraModule { + + private final List<CassandraTable> tables; + private final List<CassandraType> types; + + public CassandraBlobModule() { + tables = ImmutableList.of( + new CassandraTable(Blobs.TABLE_NAME, + SchemaBuilder.createTable(Blobs.TABLE_NAME) + .ifNotExists() + .addPartitionKey(Blobs.ID, DataType.text()) + .addClusteringColumn(Blobs.POSITION, DataType.bigint()) + .addColumn(Blobs.PART, DataType.text())), + new CassandraTable(BlobParts.TABLE_NAME, + SchemaBuilder.createTable(BlobParts.TABLE_NAME) + .ifNotExists() + .addPartitionKey(BlobParts.ID, DataType.text()) + .addColumn(BlobParts.DATA, DataType.blob()))); + types = ImmutableList.of(); + } + + @Override + public List<CassandraTable> moduleTables() { + return tables; + } + + @Override + public List<CassandraType> moduleTypes() { + return types; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java new file mode 100644 index 0000000..be097a5 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java @@ -0,0 +1,33 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.table; + +public interface BlobsTable { + String TABLE_NAME = "blobs"; + String ID = "id"; + String POSITION = "position"; + String PART = "part"; + + interface BlobParts { + String TABLE_NAME = "blobParts"; + String ID = "id"; + String DATA = "data"; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java new file mode 100644 index 0000000..f7bc698 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.table; + +import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID; + +public interface CassandraMessageV2Table { + + String TABLE_NAME = "messageV2"; + String INTERNAL_DATE = "internalDate"; + String BODY_START_OCTET = "bodyStartOctet"; + String FULL_CONTENT_OCTETS = "fullContentOctets"; + String BODY_OCTECTS = "bodyOctets"; + String TEXTUAL_LINE_COUNT = "textualLineCount"; + String BODY_CONTENT = "bodyContent"; + String HEADER_CONTENT = "headerContent"; + String PROPERTIES = "properties"; + String ATTACHMENTS = "attachments"; + + String[] FIELDS = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, ATTACHMENTS }; + String[] METADATA = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, TEXTUAL_LINE_COUNT, PROPERTIES }; + String[] HEADERS = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES }; + String[] BODY = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, BODY_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, ATTACHMENTS }; + + interface Properties { + String NAMESPACE = "namespace"; + String NAME = "name"; + String VALUE = "value"; + } + + interface Attachments { + String ID = "id"; + String NAME = "name"; + String CID = "cid"; + String IS_INLINE = "isInline"; + } + + interface Blobs { + String TABLE_NAME = "blobs"; + String ID = "id"; + String POSITION = "position"; + String PART = "part"; + } + + interface BlobParts { + String TABLE_NAME = "blobParts"; + String ID = "id"; + String DATA = "data"; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java new file mode 100644 index 0000000..56d6356 --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java @@ -0,0 +1,83 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.ids; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.base.Charsets; + +import nl.jqno.equalsverifier.EqualsVerifier; + +public class BlobIdTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldRespectBeanContract() { + EqualsVerifier.forClass(BlobId.class).verify(); + } + + @Test + public void fromShouldConstructBlobId() { + String id = "111"; + assertThat(BlobId.from(id)) + .isEqualTo(new BlobId(id)); + } + + @Test + public void fromShouldThrowOnNull() { + expectedException.expect(IllegalArgumentException.class); + + BlobId.from(null); + } + + @Test + public void fromShouldThrowOnEmpty() { + expectedException.expect(IllegalArgumentException.class); + + BlobId.from(""); + } + + @Test + public void forPayloadShouldThrowOnNull() { + expectedException.expect(IllegalArgumentException.class); + + BlobId.forPayload(null); + } + + @Test + public void forPayloadShouldHashEmptyArray() { + BlobId blobId = BlobId.forPayload(new byte[0]); + + assertThat(blobId.getId()).isEqualTo("da39a3ee5e6b4b0d3255bfef95601890afd80709"); + } + + @Test + public void forPayloadShouldHashArray() { + BlobId blobId = BlobId.forPayload("content".getBytes(Charsets.UTF_8)); + + assertThat(blobId.getId()).isEqualTo("040f06fd774092478d450774f5ba30c5da78acc8"); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java new file mode 100644 index 0000000..b236c55 --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java @@ -0,0 +1,87 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.ids; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import nl.jqno.equalsverifier.EqualsVerifier; + +public class PartIdTest { + private static final BlobId BLOB_ID = BlobId.from("abc"); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldRespectBeanContract() { + EqualsVerifier.forClass(PartId.class).verify(); + } + + @Test + public void test () { + String id = "111"; + assertThat(PartId.from(id)) + .isEqualTo(new PartId(id)); + } + + @Test + public void fromShouldThrowOnNull() { + expectedException.expect(IllegalArgumentException.class); + + PartId.from(null); + } + + @Test + public void fromShouldThrowOnEmpty() { + expectedException.expect(IllegalArgumentException.class); + + PartId.from(""); + } + + @Test + public void createShouldThrowOnNullBlobId() { + expectedException.expect(NullPointerException.class); + + PartId.create(null, 1); + } + + @Test + public void createShouldThrowOnNegativePosition() { + expectedException.expect(IllegalArgumentException.class); + + PartId.create(BLOB_ID, -1); + } + + @Test + public void createShouldAcceptPositionZero() { + assertThat(PartId.create(BLOB_ID, 0).getId()) + .isEqualTo(BLOB_ID.getId() + "-0"); + } + + @Test + public void createShouldConcatenateBlobIdAndPosition() { + assertThat(PartId.create(BLOB_ID, 36).getId()) + .isEqualTo(BLOB_ID.getId() + "-36"); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java new file mode 100644 index 0000000..786d00e --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java @@ -0,0 +1,126 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.Charsets; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.mailbox.cassandra.BlobId; +import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Strings; + +public class CassandraBlobsDAOTest { + private static final int MULTIPLE_CHUNK_SIZE = 3 * CassandraBlobsDAO.CHUNK_SIZE; + private CassandraCluster cassandra; + private CassandraBlobsDAO testee; + + @Before + public void setUp() throws Exception { + cassandra = CassandraCluster.create(new CassandraBlobModule()); + cassandra.ensureAllTables(); + + testee = new CassandraBlobsDAO(cassandra.getConf()); + } + + @After + public void tearDown() throws Exception { + cassandra.clearAllTables(); + cassandra.close(); + } + + @Test + public void saveShouldReturnEmptyWhenNullData() throws Exception { + Optional<BlobId> blobId = testee.save(null).join(); + + assertThat(blobId.isPresent()).isFalse(); + } + + @Test + public void saveShouldSaveEmptyData() throws Exception { + Optional<BlobId> blobId = testee.save(new byte[]{}).join(); + + byte[] bytes = testee.read(blobId.get()).join(); + + assertThat(blobId.isPresent()).isTrue(); + assertThat(new String(bytes, Charsets.UTF_8)).isEmpty(); + } + + @Test + public void saveShouldSaveBlankData() throws Exception { + Optional<BlobId> blobId = testee.save("".getBytes(Charsets.UTF_8)).join(); + + byte[] bytes = testee.read(blobId.get()).join(); + + assertThat(blobId.isPresent()).isTrue(); + assertThat(new String(bytes, Charsets.UTF_8)).isEmpty(); + } + + @Test + public void saveShouldReturnBlobId() throws Exception { + Optional<BlobId> blobId = testee.save("toto".getBytes(Charsets.UTF_8)).join(); + + assertThat(blobId.isPresent()).isTrue(); + } + + @Test + public void readShouldBeEmptyWhenNoExisting() throws IOException { + byte[] bytes = testee.read(BlobId.from("unknown")).join(); + + assertThat(bytes).isEmpty(); + } + + @Test + public void readShouldReturnSavedData() throws IOException { + Optional<BlobId> blobId = testee.save("toto".getBytes(Charsets.UTF_8)).join(); + + byte[] bytes = testee.read(blobId.get()).join(); + + assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo("toto"); + } + + @Test + public void readShouldReturnLongSavedData() throws IOException { + String longString = Strings.repeat("0123456789\n", 1000); + Optional<BlobId> blobId = testee.save(longString.getBytes(Charsets.UTF_8)).join(); + + byte[] bytes = testee.read(blobId.get()).join(); + + assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo(longString); + } + + @Test + public void readShouldReturnSplitSavedDataByChunk() throws IOException { + String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE); + Optional<BlobId> blobId = testee.save(longString.getBytes(Charsets.UTF_8)).join(); + + byte[] bytes = testee.read(blobId.get()).join(); + + assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo(longString); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java new file mode 100644 index 0000000..ccb5e4f --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java @@ -0,0 +1,139 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.utils; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; +import com.github.steveash.guavate.Guavate; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Bytes; + +public class DataChunkerTest { + + public static final int CHUNK_SIZE = 10; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private DataChunker testee; + + @Before + public void setUp() { + testee = new DataChunker(); + } + + @Test + public void chunkShouldThrowOnNullData() { + expectedException.expect(NullPointerException.class); + + testee.chunk(null, CHUNK_SIZE); + } + + @Test + public void chunkShouldThrowOnNegativeChunkSize() { + expectedException.expect(IllegalArgumentException.class); + + int chunkSize = -1; + testee.chunk(new byte[0], chunkSize); + } + + @Test + public void chunkShouldThrowOnZeroChunkSize() { + expectedException.expect(IllegalArgumentException.class); + + int chunkSize = 0; + testee.chunk(new byte[0], chunkSize); + } + + @Test + public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() { + Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(new byte[0], CHUNK_SIZE); + assertThat(toArraysWithPosition(chunks)) + .containsOnly(Pair.of(0, ImmutableList.of())); + } + + @Test + public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() { + byte[] data = "12345".getBytes(Charsets.UTF_8); + + Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE); + + assertThat(toArraysWithPosition(chunks)) + .containsOnly(Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(data)))); + } + + @Test + public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() { + byte[] data = "1234567890".getBytes(Charsets.UTF_8); + assertThat(data.length).isEqualTo(CHUNK_SIZE); + + Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE); + + assertThat(toArraysWithPosition(chunks)) + .containsOnly(Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(data)))); + } + + @Test + public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() { + byte[] part1 = "1234567890".getBytes(Charsets.UTF_8); + byte[] part2 = "12345".getBytes(Charsets.UTF_8); + byte[] data = Bytes.concat(part1, part2); + + Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE); + + assertThat(toArraysWithPosition(chunks)) + .containsOnly( + Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(part1))), + Pair.of(1, ImmutableList.copyOf(ArrayUtils.toObject(part2)))); + } + + private ImmutableList<Pair<Integer, List<Byte>>> toArraysWithPosition(Stream<Pair<Integer, ByteBuffer>> chunks) { + return chunks + .map(this::toByteArrayPair) + .collect(Guavate.toImmutableList()); + } + + private Pair<Integer, List<Byte>> toByteArrayPair(Pair<Integer, ByteBuffer> pair) { + try { + Byte[] bytes = ArrayUtils.toObject(IOUtils.toByteArray(new ByteBufferBackedInputStream(pair.getRight()))); + return Pair.of(pair.getKey(), Arrays.asList(bytes)); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org