ableegoldman commented on code in PR #14853:
URL: https://github.com/apache/kafka/pull/14853#discussion_r1412620201


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -572,13 +574,19 @@ public synchronized void flush() {
 
     @Override
     public void addToBatch(final KeyValue<byte[], byte[]> record,
-                           final WriteBatch batch) throws RocksDBException {
+                           final WriteBatchInterface batch) throws 
RocksDBException {
         dbAccessor.addToBatch(record.key, record.value, batch);
     }
 
     @Override
-    public void write(final WriteBatch batch) throws RocksDBException {
-        db.write(wOptions, batch);
+    public void write(final WriteBatchInterface batch) throws RocksDBException 
{
+        if (batch instanceof WriteBatch) {
+            db.write(wOptions, (WriteBatch) batch);
+        } else if (batch instanceof WriteBatchWithIndex) {
+            db.write(wOptions, (WriteBatchWithIndex) batch);
+        } else {
+            throw new ProcessorStateException("Unknown type of batch " + 
batch.getClass().getCanonicalName());

Review Comment:
   nit: log an error before we throw the exception
   
   Also, imo this should just be an IllegalStateException instead of a 
ProcessorStateException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to