lucasbru commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2879072689
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +311,103 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ @SuppressWarnings("unchecked")
+ final Processor<Object, Object, Object, Object>
typedSource =
+ (Processor<Object, Object, Object, Object>)
source;
+ @SuppressWarnings("unchecked")
+ final Record<Object, Object> typedRecord =
(Record<Object, Object>) deserializedRecord;
+ typedSource.process(typedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ storeName,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid
ProcessingExceptionHandler response");
+ log.warn("Dead letter queue records cannot
be sent for GlobalKTable processors " +
+ "(no producer available).
DLQ support for GlobalKTable will be addressed in a future KIP. " + "Record
context: {}",
+ errorHandlerContext);
+ } catch (Exception fatalUserException) {
+ log.error(
+ "Processing error callback failed
after processing error for record: {}",
+ errorHandlerContext,
+ processingException
+ );
Review Comment:
nit: missing space after `if` — should be `if (!response...`. Will fail
checkstyle.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java:
##########
@@ -342,13 +354,144 @@ public void shouldGetToRunningWithOnlyGlobalTopology()
throws Exception {
kafkaStreams.close();
}
+ private void createBuilderWithFailedProcessor() {
+ builder = new StreamsBuilder();
+ builder.addGlobalStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("test-global-store"),
+ Serdes.Long(),
+ Serdes.String()
+ ),
+ globalTableTopic,
+ Consumed.with(Serdes.Long(), Serdes.String()),
+ () -> new ContextualProcessor<Long, String, Void, Void>() {
+ @Override
+ public void process(final Record<Long, String> record) {
+ if (record.key() == 2L) {
+ throw new RuntimeException("Test processing
exception");
+ }
+ }
+ }
+ );
+ }
+
+ @Test
+ public void
testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws
Exception {
+ createBuilderWithFailedProcessor();
+ // enable processing exception handler invoked config
+ TestGlobalProcessingExceptionHandler.shouldResume = true;
+
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
true);
+
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+ TestGlobalProcessingExceptionHandler.class);
+
+ produceInitialGlobalTableValues();
+ startStreams();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
+
+ assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+ }
+ @Test
Review Comment:
nit: add blank lines between test methods to match the rest of the file.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java:
##########
@@ -342,13 +354,144 @@ public void shouldGetToRunningWithOnlyGlobalTopology()
throws Exception {
kafkaStreams.close();
}
+ private void createBuilderWithFailedProcessor() {
+ builder = new StreamsBuilder();
+ builder.addGlobalStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("test-global-store"),
+ Serdes.Long(),
+ Serdes.String()
+ ),
+ globalTableTopic,
+ Consumed.with(Serdes.Long(), Serdes.String()),
+ () -> new ContextualProcessor<Long, String, Void, Void>() {
+ @Override
+ public void process(final Record<Long, String> record) {
+ if (record.key() == 2L) {
+ throw new RuntimeException("Test processing
exception");
+ }
+ }
+ }
+ );
+ }
+
+ @Test
+ public void
testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws
Exception {
+ createBuilderWithFailedProcessor();
+ // enable processing exception handler invoked config
+ TestGlobalProcessingExceptionHandler.shouldResume = true;
+
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
true);
+
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+ TestGlobalProcessingExceptionHandler.class);
+
+ produceInitialGlobalTableValues();
+ startStreams();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
Duration.ofSeconds(30));
+
+ assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+ }
+ @Test
+ public void testProcessingExceptionHandlerFailEnabledRestorationPhase()
throws Exception {
+ createBuilderWithFailedProcessor();
+ // enable processing exception handler invoked config
+ TestGlobalProcessingExceptionHandler.shouldResume = false;
+
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
true);
+
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+ TestGlobalProcessingExceptionHandler.class);
+
+ produceInitialGlobalTableValues();
+ assertThrows(StreamsException.class, () -> {
+ startStreams();
+ });
+ assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+
+ }
+ @Test
+ public void testProcessingExceptionHandlerDisabledRestorationPhase()
throws Exception {
+ createBuilderWithFailedProcessor();
+ // enable processing exception handler invoked config
Review Comment:
nit: comment says "enable processing exception handler invoked config" but
the config is set to `false` here. Same issue in
`testProcessingExceptionHandlerDisabledRunTimePhase`.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java:
##########
@@ -342,13 +354,144 @@ public void shouldGetToRunningWithOnlyGlobalTopology()
throws Exception {
kafkaStreams.close();
}
+ private void createBuilderWithFailedProcessor() {
+ builder = new StreamsBuilder();
+ builder.addGlobalStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("test-global-store"),
+ Serdes.Long(),
+ Serdes.String()
+ ),
+ globalTableTopic,
+ Consumed.with(Serdes.Long(), Serdes.String()),
+ () -> new ContextualProcessor<Long, String, Void, Void>() {
+ @Override
+ public void process(final Record<Long, String> record) {
+ if (record.key() == 2L) {
Review Comment:
`record.key() == 2L` uses reference equality on boxed `Long`. Works here due
to Long caching for small values, but use `.equals(2L)` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]