rustyrazorblade commented on code in PR #211:
URL: 
https://github.com/apache/cassandra-analytics/pull/211#discussion_r3320569695


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java:
##########
@@ -297,6 +311,25 @@ public synchronized void close(BulkWriterContext 
writerContext) throws IOExcepti
         validateSSTables(writerContext);
     }
 
+    private void rebuildFilterComponents(@NotNull BulkWriterContext 
writerContext, @NotNull Path outputDirectory,
+                                         @NotNull DirectoryStream.Filter<Path> 
filter) throws IOException
+    {
+        LocalDataLayer layer = buildLocalDataLayer(writerContext, 
outputDirectory, null);
+        for (Path dataFile : getDataFileStream(filter))
+        {
+            try
+            {
+                FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, 
false, BufferingInputStreamStats::doNothingStats);
+                writerContext.bridge().rebuildBloomFilter(layer.partitioner(), 
layer.cqlTable(), ssTable, outputDirectory);
+                LOGGER.error("Rebuilt bloom filter for sstable {}", dataFile);

Review Comment:
   I think this should be INFO level.  It'll be confusing to users why every 
one of their jobs has errors.



##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java:
##########
@@ -324,6 +333,44 @@ public List<ByteBuffer> encodePartitionKeys(Partitioner 
partitioner, String keys
         return keys.stream().map(key -> buildPartitionKey(table, 
key)).collect(Collectors.toList());
     }
 
+    @Override
+    public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+                                   @NotNull CqlTable cqltable,
+                                   @NotNull SSTable ssTable,
+                                   @NotNull Path directory) throws IOException
+    {
+        String keyspace = cqltable.keyspace();
+        String table = cqltable.table();
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        try (IFilter filter = FilterDbUtils.buildBloomFilter(partitioner, 
cqltable, ssTable))
+        {
+            Function<ByteBuffer, Boolean> tracker = bytes -> {
+                DecoratedKey key = iPartitioner.decorateKey(bytes);
+                filter.add(key);
+                return false;
+            };
+
+            try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+            {
+                if (primaryIndex == null)
+                {
+                    throw new IOException("Could not read Index.db file");
+                }
+                ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+            }
+
+            File filterFile = new File(directory.toFile(), 
descriptor.relativeFilenameFor(Component.FILTER));
+            try (FileOutputStream fos = new FileOutputStream(filterFile, 
false);
+                 DataOutputStreamPlus stream = new 
BufferedDataOutputStreamPlus(fos))
+            {
+                BloomFilterSerializer.serialize((BloomFilter) filter, stream);

Review Comment:
   I'm not sure about this, but if the user has bloom filters disabled, I think 
this might throw an exception.  In 5.0 it uses IFilter.



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java:
##########
@@ -148,6 +162,71 @@ public void canCreateWriterForVersion(String version) 
throws IOException
         tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
     }
 
+    @ParameterizedTest
+    @MethodSource("supportedVersions")
+    public void testBloomFilterRebuild(String version) throws IOException
+    {
+        int rowCount = 50_000;
+        CassandraBridge bridge = CassandraBridgeFactory.get(version);
+        MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
+        Partitioner partitioner = writerContext.getPartitioner();
+        CqlTable cqlTable = 
bridge.buildSchema(writerContext.schema().getTableSchema().createStatement,
+                                               
writerContext.qualifiedTableName().keyspace(),
+                                               new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
+                                                                     
ImmutableMap.of("replication_factor", 1)),
+                                               partitioner,
+                                               Collections.emptySet());
+        SortedMap<BigInteger, List<String>> sortedKeys = new TreeMap<>();
+        for (int i = 0; i < rowCount; ++i)
+        {
+            List<String> keys = ImmutableList.of(String.valueOf(i), "1");
+            AbstractMap.SimpleEntry<ByteBuffer, BigInteger> partitionKey = 
bridge.getPartitionKey(cqlTable, partitioner, keys);
+            sortedKeys.put(partitionKey.getValue(), keys);
+        }
+
+        SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, 
tmpDir, new XXHash32DigestAlgorithm(), 1);
+        List<SSTableDescriptor> allSSTables = new ArrayList<>();
+        tw.setSSTablesProducedListener(allSSTables::addAll);
+        for (BigInteger token : sortedKeys.keySet())
+        {
+            List<String> partitionKey = sortedKeys.get(token);
+            tw.addRow(token,
+                      ImmutableMap.of("id", 
Integer.parseInt(partitionKey.get(0)),
+                                      "date", 
Integer.parseInt(partitionKey.get(1)),
+                                      "course", "foo", "marks", 1));
+        }
+        tw.close(writerContext);
+
+        assertThat(allSSTables).hasSize(1);
+
+        Set<Path> filterFilePaths = new HashSet<>();
+        try (DirectoryStream<Path> filterFileStream = 
Files.newDirectoryStream(tw.getOutDir(), "*-Filter.db"))
+        {
+            filterFileStream.forEach(filterFilePaths::add);
+        }
+
+        assertThat(filterFilePaths).hasSize(1);
+
+        Path filterFile = filterFilePaths.iterator().next();
+        String dataFileName = filterFile.toFile().getName().replace("-Filter", 
"-Data");
+        Path dataFilePath = filterFile.getParent().resolve(dataFileName);
+        FileSystemSSTable ssTable = new FileSystemSSTable(dataFilePath, false, 
BufferingInputStreamStats::doNothingStats);
+
+        BloomFilter bloomFilter = bridge.openBloomFilter(partitioner,
+                                                         
writerContext.qualifiedTableName().keyspace(),
+                                                         
writerContext.qualifiedTableName().table(),
+                                                         ssTable);
+
+        // second column is always set to 1 when inserting data
+        List<ByteBuffer> searchKeys = bridge.encodePartitionKeys(partitioner,
+                                                                 
writerContext.qualifiedTableName().keyspace(),
+                                                                 
writerContext.schema().getTableSchema().createStatement,
+                                                                 
ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2")));
+
+        assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue();
+        assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue();

Review Comment:
   Since we're using a limited set of data in here, it's very unlikely we'll 
ever hit this... so maybe just leave a comment.  If the test changes, we 
_could_ hit a bloom filter false positive, and it'll cause the test to fail.  I 
know it's really, really unlikely, so please just document it.



##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.reader.ReaderUtils;
+import org.apache.cassandra.spark.reader.SchemaBuilder;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Helper methods for managing bloom filters.
+ */
+public class FilterDbUtils
+{
+    private FilterDbUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static IFilter buildBloomFilter(@NotNull Partitioner partitioner,
+                                           @NotNull CqlTable cqltable,
+                                           @NotNull SSTable ssTable) throws 
IOException
+    {
+        String keyspace = cqltable.keyspace();
+        String table = cqltable.table();
+
+        Map<MetadataType, MetadataComponent> componentMap = 
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, 
EnumSet.of(MetadataType.STATS));
+        StatsMetadata statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);

Review Comment:
   Is there a possibility this could be NULL?  



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java:
##########
@@ -297,6 +311,25 @@ public synchronized void close(BulkWriterContext 
writerContext) throws IOExcepti
         validateSSTables(writerContext);
     }
 
+    private void rebuildFilterComponents(@NotNull BulkWriterContext 
writerContext, @NotNull Path outputDirectory,
+                                         @NotNull DirectoryStream.Filter<Path> 
filter) throws IOException
+    {
+        LocalDataLayer layer = buildLocalDataLayer(writerContext, 
outputDirectory, null);
+        for (Path dataFile : getDataFileStream(filter))
+        {
+            try
+            {
+                FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, 
false, BufferingInputStreamStats::doNothingStats);
+                writerContext.bridge().rebuildBloomFilter(layer.partitioner(), 
layer.cqlTable(), ssTable, outputDirectory);
+                LOGGER.error("Rebuilt bloom filter for sstable {}", dataFile);
+            }
+            catch (Exception e)
+            {
+                LOGGER.warn("Failed to rebuild bloom filter for sstable {}", 
dataFile, e);

Review Comment:
   Is it safe to swallow this exception?  How would it fail?  Maybe make it an 
error?



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java:
##########
@@ -31,6 +31,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;

Review Comment:
   nit / question: Is this is commons lang3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to