yifan-c commented on code in PR #211:
URL: 
https://github.com/apache/cassandra-analytics/pull/211#discussion_r3327301498


##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java:
##########
@@ -324,6 +333,52 @@ public List<ByteBuffer> encodePartitionKeys(Partitioner 
partitioner, String keys
         return keys.stream().map(key -> buildPartitionKey(table, 
key)).collect(Collectors.toList());
     }
 
+    @Override
+    public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+                                   @NotNull CqlTable cqltable,
+                                   @NotNull SSTable ssTable,
+                                   @NotNull Path directory) throws IOException
+    {
+        String keyspace = cqltable.keyspace();
+        String table = cqltable.table();
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner);
+        TableMetadata tableMetadata = schemaBuilder.tableMetaData();
+
+        if (tableMetadata.params.bloomFilterFpChance == 1.0)
+        {
+            return; // bloom filter has been disabled for the table
+        }
+
+        try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable, 
ssTable, tableMetadata))
+        {
+            Function<ByteBuffer, Boolean> tracker = bytes -> {
+                DecoratedKey key = iPartitioner.decorateKey(bytes);
+                filter.add(key);
+                return false;
+            };
+
+            try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+            {
+                if (primaryIndex == null)
+                {
+                    throw new IOException("Could not read Index.db file");
+                }
+                ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+            }
+
+            File filterFile = new File(directory.toFile(), 
descriptor.relativeFilenameFor(Component.FILTER));
+            try (FileOutputStream fos = new FileOutputStream(filterFile, 
false);
+                 DataOutputStreamPlus stream = new 
BufferedDataOutputStreamPlus(fos))
+            {
+                BloomFilterSerializer.serialize((BloomFilter) filter, stream);
+                stream.flush();
+                SyncUtil.sync(fos);
+            }

Review Comment:
   Should we delete the `filterFile` if it fails to rebuild, instead of 
rethrow? 
   
   I think we are fine to upload without the filter component (after deleting 
the likely corrupted filter file). Server will rebuild the filter. Rethrow 
fails the task, which causes a more expensive retry, delaying the completion of 
the spark job. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java:
##########
@@ -297,6 +311,18 @@ public synchronized void close(BulkWriterContext 
writerContext) throws IOExcepti
         validateSSTables(writerContext);
     }
 
+    protected void rebuildFilterComponents(@NotNull BulkWriterContext 
writerContext, @NotNull Path outputDirectory,
+                                         @NotNull DirectoryStream.Filter<Path> 
filter) throws IOException

Review Comment:
   nit: indentation is off by 2 spaces. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to