yifan-c commented on code in PR #211:
URL:
https://github.com/apache/cassandra-analytics/pull/211#discussion_r3327301498
##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java:
##########
@@ -324,6 +333,52 @@ public List<ByteBuffer> encodePartitionKeys(Partitioner
partitioner, String keys
return keys.stream().map(key -> buildPartitionKey(table,
key)).collect(Collectors.toList());
}
+ @Override
+ public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+ @NotNull CqlTable cqltable,
+ @NotNull SSTable ssTable,
+ @NotNull Path directory) throws IOException
+ {
+ String keyspace = cqltable.keyspace();
+ String table = cqltable.table();
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace,
table, ssTable);
+ SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner);
+ TableMetadata tableMetadata = schemaBuilder.tableMetaData();
+
+ if (tableMetadata.params.bloomFilterFpChance == 1.0)
+ {
+ return; // bloom filter has been disabled for the table
+ }
+
+ try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable,
ssTable, tableMetadata))
+ {
+ Function<ByteBuffer, Boolean> tracker = bytes -> {
+ DecoratedKey key = iPartitioner.decorateKey(bytes);
+ filter.add(key);
+ return false;
+ };
+
+ try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+ {
+ if (primaryIndex == null)
+ {
+ throw new IOException("Could not read Index.db file");
+ }
+ ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+ }
+
+ File filterFile = new File(directory.toFile(),
descriptor.relativeFilenameFor(Component.FILTER));
+ try (FileOutputStream fos = new FileOutputStream(filterFile,
false);
+ DataOutputStreamPlus stream = new
BufferedDataOutputStreamPlus(fos))
+ {
+ BloomFilterSerializer.serialize((BloomFilter) filter, stream);
+ stream.flush();
+ SyncUtil.sync(fos);
+ }
Review Comment:
Should we delete the `filterFile` if it fails to rebuild, instead of
rethrow?
I think we are fine to upload without the filter component (after deleting
the likely corrupted filter file). Server will rebuild the filter. Rethrow
fails the task, which causes a more expensive retry, delaying the completion of
the spark job.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java:
##########
@@ -297,6 +311,18 @@ public synchronized void close(BulkWriterContext
writerContext) throws IOExcepti
validateSSTables(writerContext);
}
+ protected void rebuildFilterComponents(@NotNull BulkWriterContext
writerContext, @NotNull Path outputDirectory,
+ @NotNull DirectoryStream.Filter<Path>
filter) throws IOException
Review Comment:
nit: indentation is off by 2 spaces.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]