ableegoldman commented on code in PR #14853: URL: https://github.com/apache/kafka/pull/14853#discussion_r1412620201
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -572,13 +574,19 @@ public synchronized void flush() { @Override public void addToBatch(final KeyValue<byte[], byte[]> record, - final WriteBatch batch) throws RocksDBException { + final WriteBatchInterface batch) throws RocksDBException { dbAccessor.addToBatch(record.key, record.value, batch); } @Override - public void write(final WriteBatch batch) throws RocksDBException { - db.write(wOptions, batch); + public void write(final WriteBatchInterface batch) throws RocksDBException { + if (batch instanceof WriteBatch) { + db.write(wOptions, (WriteBatch) batch); + } else if (batch instanceof WriteBatchWithIndex) { + db.write(wOptions, (WriteBatchWithIndex) batch); + } else { + throw new ProcessorStateException("Unknown type of batch " + batch.getClass().getCanonicalName()); Review Comment: nit: log an error before we throw the exception Also, imo this should just be an IllegalStateException instead of a ProcessorStateException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org