awelless commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2906706849
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for
demarcating multiple Kine
private static final Set<Relationship> RAW_FILE_RELATIONSHIPS =
Set.of(REL_SUCCESS);
private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
- private volatile DynamoDbAsyncClient dynamoDbClient;
- private volatile CloudWatchAsyncClient cloudWatchClient;
- private volatile KinesisAsyncClient kinesisClient;
- private volatile Scheduler kinesisScheduler;
-
+ private volatile SdkHttpClient kinesisHttpClient;
+ private volatile SdkHttpClient dynamoHttpClient;
+ private volatile KinesisClient kinesisClient;
+ private volatile DynamoDbClient dynamoDbClient;
+ private volatile SdkAsyncHttpClient asyncHttpClient;
+ private volatile KinesisShardManager shardManager;
+ private volatile KinesisConsumerClient consumerClient;
private volatile String streamName;
- private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
- private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
- private volatile @Nullable byte[] demarcatorValue;
+ private volatile int maxRecordsPerRequest;
+ private volatile String initialStreamPosition;
+ private volatile long maxBatchNanos;
+ private volatile long maxBatchBytes;
- private volatile Future<InitializationResult> initializationResultFuture;
- private final AtomicBoolean initialized = new AtomicBoolean();
-
- // An instance filed, so that it can be read in getRelationships.
- private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.from(
- PROCESSING_STRATEGY.getDefaultValue());
+ private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
- @Override
- public void migrateProperties(final PropertyConfiguration config) {
- ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
- }
-
@Override
public Set<Relationship> getRelationships() {
return switch (processingStrategy) {
- case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+ case FLOW_FILE, LINE_DELIMITED, DEMARCATOR ->
RAW_FILE_RELATIONSHIPS;
case RECORD -> RECORD_FILE_RELATIONSHIPS;
};
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
if (descriptor.equals(PROCESSING_STRATEGY)) {
- processingStrategy = ProcessingStrategy.from(newValue);
+ processingStrategy = ProcessingStrategy.valueOf(newValue);
}
}
- @OnScheduled
- public void setup(final ProcessContext context) {
- readerRecordProcessor = switch (processingStrategy) {
- case FLOW_FILE, DEMARCATOR -> null;
- case RECORD -> createReaderRecordProcessor(context);
- };
- demarcatorValue = switch (processingStrategy) {
- case FLOW_FILE, RECORD -> null;
- case DEMARCATOR -> {
- final String demarcatorValue =
context.getProperty(MESSAGE_DEMARCATOR).getValue();
- yield demarcatorValue != null ?
demarcatorValue.getBytes(UTF_8) : new byte[0];
- }
- };
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+ config.removeProperty("Checkpoint Interval");
+ config.removeProperty("Metrics Publishing");
+ }
+
+ @Override
+ public void migrateRelationships(final RelationshipConfiguration config) {
+ config.renameRelationship("parse failure", "parse.failure");
+ }
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
final Region region = RegionUtil.getRegion(context);
final AwsCredentialsProvider credentialsProvider =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+ final String endpointOverride =
context.getProperty(ENDPOINT_OVERRIDE).getValue();
- kinesisClient = KinesisAsyncClient.builder()
- .region(region)
- .credentialsProvider(credentialsProvider)
- .endpointOverride(getKinesisEndpointOverride())
- .httpClient(createKinesisHttpClient(context))
+ final ClientOverrideConfiguration clientConfig =
ClientOverrideConfiguration.builder()
+ .apiCallTimeout(API_CALL_TIMEOUT)
+ .apiCallAttemptTimeout(API_CALL_ATTEMPT_TIMEOUT)
.build();
- dynamoDbClient = DynamoDbAsyncClient.builder()
+ final KinesisClientBuilder kinesisBuilder = KinesisClient.builder()
.region(region)
.credentialsProvider(credentialsProvider)
- .endpointOverride(getDynamoDbEndpointOverride())
- .httpClient(createHttpClientBuilder(context).build())
- .build();
+ .overrideConfiguration(clientConfig);
- cloudWatchClient = CloudWatchAsyncClient.builder()
+ final DynamoDbClientBuilder dynamoBuilder = DynamoDbClient.builder()
.region(region)
.credentialsProvider(credentialsProvider)
- .endpointOverride(getCloudwatchEndpointOverride())
- .httpClient(createHttpClientBuilder(context).build())
- .build();
-
- streamName = context.getProperty(STREAM_NAME).getValue();
- final InitialPositionInStreamExtended initialPositionExtended =
getInitialPosition(context);
- final SingleStreamTracker streamTracker = new
SingleStreamTracker(streamName, initialPositionExtended);
-
- final long maxBytesToBuffer =
context.getProperty(MAX_BYTES_TO_BUFFER).asDataSize(DataUnit.B).longValue();
- final Duration checkpointInterval =
context.getProperty(CHECKPOINT_INTERVAL).asDuration();
- final MemoryBoundRecordBuffer memoryBoundRecordBuffer = new
MemoryBoundRecordBuffer(getLogger(), maxBytesToBuffer, checkpointInterval);
- recordBuffer = memoryBoundRecordBuffer;
- final ShardRecordProcessorFactory recordProcessorFactory = () -> new
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
-
- final String applicationName =
context.getProperty(APPLICATION_NAME).getValue();
- final String workerId = generateWorkerId();
- final ConfigsBuilder configsBuilder = new
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient,
cloudWatchClient, workerId, recordProcessorFactory);
-
- final MetricsFactory metricsFactory = configureMetricsFactory(context);
- final RetrievalSpecificConfig retrievalSpecificConfig =
configureRetrievalSpecificConfig(context, kinesisClient, streamName,
applicationName);
-
- final InitializationStateChangeListener initializationListener = new
InitializationStateChangeListener(getLogger());
- initialized.set(false);
- initializationResultFuture = initializationListener.result();
-
- kinesisScheduler = new Scheduler(
- configsBuilder.checkpointConfig(),
-
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
- configsBuilder.leaseManagementConfig(),
- configsBuilder.lifecycleConfig(),
- configsBuilder.metricsConfig().metricsFactory(metricsFactory),
- configsBuilder.processorConfig(),
-
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
- );
-
- final String schedulerThreadName =
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
- final Thread schedulerThread = new Thread(kinesisScheduler,
schedulerThreadName);
- schedulerThread.setDaemon(true);
- schedulerThread.start();
- // The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
+ .overrideConfiguration(clientConfig);
- try {
- final InitializationResult result = initializationResultFuture.get(
-
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
- checkInitializationResult(result);
- } catch (final TimeoutException e) {
- // During a first run the processor will take more time to
initialize. We return from OnSchedule and continue waiting in the onTrigger
method.
- getLogger().warn("Kinesis Scheduler initialization may take up to
10 minutes on a first run, which is caused by AWS resources initialization");
- } catch (final InterruptedException | ExecutionException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- cleanUpState();
- throw new ProcessException("Initialization failed for stream
[%s]".formatted(streamName), e);
+ if (endpointOverride != null && !endpointOverride.isEmpty()) {
+ final URI endpointUri = URI.create(endpointOverride);
+ kinesisBuilder.endpointOverride(endpointUri);
+ dynamoBuilder.endpointOverride(endpointUri);
}
- }
- /**
- * Creating Kinesis HTTP client, as per
- * {@link
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
- */
- private static SdkAsyncHttpClient createKinesisHttpClient(final
ProcessContext context) {
- return createHttpClientBuilder(context)
- .protocol(Protocol.HTTP2)
- // Since we're using HTTP/2, multiple concurrent requests will
reuse the same HTTP connection.
- // Therefore, the number of real connections is going to be
relatively small.
- .maxConcurrency(Integer.MAX_VALUE)
- .http2Configuration(Http2Configuration.builder()
-
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
-
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)
- .build())
- .build();
- }
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
- private static NettyNioAsyncHttpClient.Builder
createHttpClientBuilder(final ProcessContext context) {
- final NettyNioAsyncHttpClient.Builder builder =
NettyNioAsyncHttpClient.builder()
- .connectionTimeout(HTTP_CLIENTS_CONNECTION_TIMEOUT)
- .readTimeout(HTTP_CLIENTS_READ_TIMEOUT);
+ kinesisHttpClient = buildApacheHttpClient(proxyConfig,
PollingKinesisClient.MAX_CONCURRENT_FETCHES + 10);
+ dynamoHttpClient = buildApacheHttpClient(proxyConfig, 50);
+ kinesisBuilder.httpClient(kinesisHttpClient);
+ dynamoBuilder.httpClient(dynamoHttpClient);
+
+ kinesisClient = kinesisBuilder.build();
+ dynamoDbClient = dynamoBuilder.build();
+
+ final String checkpointTableName =
context.getProperty(APPLICATION_NAME).getValue();
+ streamName = context.getProperty(STREAM_NAME).getValue();
+ maxRecordsPerRequest =
context.getProperty(MAX_RECORDS_PER_REQUEST).asInteger();
+ initialStreamPosition =
context.getProperty(INITIAL_STREAM_POSITION).getValue();
+ maxBatchNanos =
context.getProperty(MAX_BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+ maxBatchBytes =
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B).longValue();
+
+ shardManager = createShardManager(kinesisClient, dynamoDbClient,
getLogger(), checkpointTableName, streamName);
+ shardManager.ensureCheckpointTableExists();
+
+ final boolean efoMode =
ConsumerType.ENHANCED_FAN_OUT.equals(context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class));
+ consumerClient = createConsumerClient(kinesisClient, getLogger(),
efoMode);
+
+ final Instant timestampForPosition = resolveTimestampPosition(context);
+ if (timestampForPosition != null) {
+ if (consumerClient instanceof PollingKinesisClient polling) {
+ polling.setTimestampForInitialPosition(timestampForPosition);
+ } else if (consumerClient instanceof EfoKinesisClient efo) {
+ efo.setTimestampForInitialPosition(timestampForPosition);
+ }
+ }
- final ProxyConfigurationService proxyConfigService =
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
- if (proxyConfigService != null) {
- final ProxyConfiguration proxyConfig =
proxyConfigService.getConfiguration();
+ if (efoMode) {
+ final NettyNioAsyncHttpClient.Builder nettyBuilder =
NettyNioAsyncHttpClient.builder()
+ .maxConcurrency(500)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(60));
- final
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder
proxyConfigBuilder =
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ final
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder
nettyProxyBuilder =
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
.host(proxyConfig.getProxyServerHost())
.port(proxyConfig.getProxyServerPort());
- if (proxyConfig.hasCredential()) {
- proxyConfigBuilder.username(proxyConfig.getProxyUserName());
-
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
- }
+ if (proxyConfig.hasCredential()) {
+ nettyProxyBuilder.username(proxyConfig.getProxyUserName());
+
nettyProxyBuilder.password(proxyConfig.getProxyUserPassword());
+ }
- builder.proxyConfiguration(proxyConfigBuilder.build());
- }
+ nettyBuilder.proxyConfiguration(nettyProxyBuilder.build());
+ }
- return builder;
- }
+ asyncHttpClient = nettyBuilder.build();
- private ReaderRecordProcessor createReaderRecordProcessor(final
ProcessContext context) {
- final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final RecordSetWriterFactory recordWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final KinesisAsyncClientBuilder asyncBuilder =
KinesisAsyncClient.builder()
+ .region(region)
+ .credentialsProvider(credentialsProvider)
+ .httpClient(asyncHttpClient);
- final OutputStrategy outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
- final KinesisRecordConverter converter = switch (outputStrategy) {
- case USE_VALUE -> new ValueRecordConverter();
- case USE_WRAPPER -> new WrapperRecordConverter();
- case INJECT_METADATA -> new InjectMetadataRecordConverter();
- };
+ if (endpointOverride != null && !endpointOverride.isEmpty()) {
+ asyncBuilder.endpointOverride(URI.create(endpointOverride));
+ }
- return new ReaderRecordProcessor(recordReaderFactory, converter,
recordWriterFactory, getLogger());
+ final String consumerName =
context.getProperty(APPLICATION_NAME).getValue();
+ consumerClient.initialize(asyncBuilder.build(), streamName,
consumerName);
+ }
}
- private static InitialPositionInStreamExtended getInitialPosition(final
ProcessContext context) {
- final InitialPosition initialPosition =
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
- return switch (initialPosition) {
- case TRIM_HORIZON ->
-
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
- case LATEST ->
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
- case AT_TIMESTAMP -> {
- final String timestampValue =
context.getProperty(STREAM_POSITION_TIMESTAMP).getValue();
- final Instant timestamp = Instant.parse(timestampValue);
- yield
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
- }
- };
+ private static Instant resolveTimestampPosition(final ProcessContext
context) {
+ final InitialPosition position =
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
+ if (position == InitialPosition.AT_TIMESTAMP) {
+ return
Instant.parse(context.getProperty(STREAM_POSITION_TIMESTAMP).getValue());
+ }
+ return null;
}
- private String generateWorkerId() {
- final String processorId = getIdentifier();
- final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
-
- final String workerId;
+ /**
+ * Builds an {@link ApacheHttpClient} with the given connection pool size
and optional proxy
+ * configuration. Each AWS service client (Kinesis, DynamoDB) should
receive its own HTTP client
+ * so their connection pools are isolated and cannot starve each other
under high shard counts.
+ */
+ private static SdkHttpClient buildApacheHttpClient(final
ProxyConfiguration proxyConfig, final int maxConnections) {
+ final ApacheHttpClient.Builder builder = ApacheHttpClient.builder()
+ .maxConnections(maxConnections);
- if (nodeTypeProvider.isClustered()) {
- // If a node id is not available for some reason, generating a
random UUID helps to avoid collisions.
- final String nodeId =
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
- workerId = "%s@%s".formatted(processorId, nodeId);
- } else {
- workerId = processorId;
- }
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ final URI proxyEndpoint = URI.create(String.format("http://%s:%s",
proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort()));
+ final
software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyBuilder =
+
software.amazon.awssdk.http.apache.ProxyConfiguration.builder().endpoint(proxyEndpoint);
- return workerId;
- }
+ if (proxyConfig.hasCredential()) {
+ proxyBuilder.username(proxyConfig.getProxyUserName());
+ proxyBuilder.password(proxyConfig.getProxyUserPassword());
+ }
- private static @Nullable MetricsFactory configureMetricsFactory(final
ProcessContext context) {
- final MetricsPublishing metricsPublishing =
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
- return switch (metricsPublishing) {
- case DISABLED -> new NullMetricsFactory();
- case LOGS -> new LogMetricsFactory();
- case CLOUDWATCH -> null; // If no metrics factory was provided,
CloudWatch metrics factory is used by default.
- };
- }
+ builder.proxyConfiguration(proxyBuilder.build());
+ }
- private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
- final ProcessContext context,
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final String applicationName) {
- final ConsumerType consumerType =
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
- return switch (consumerType) {
- case SHARED_THROUGHPUT -> new
PollingConfig(kinesisClient).streamName(streamName);
- case ENHANCED_FAN_OUT -> new
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
- };
+ return builder.build();
}
@OnStopped
public void onStopped() {
- cleanUpState();
+ if (shardManager != null) {
+ shardManager.releaseAllLeases();
+ shardManager.close();
+ shardManager = null;
+ }
- initialized.set(false);
- initializationResultFuture = null;
- }
+ if (consumerClient != null) {
+ consumerClient.close();
+ consumerClient = null;
+ }
- private void cleanUpState() {
- if (kinesisScheduler != null) {
- shutdownScheduler();
- kinesisScheduler = null;
+ if (asyncHttpClient != null) {
+ asyncHttpClient.close();
+ asyncHttpClient = null;
}
if (kinesisClient != null) {
kinesisClient.close();
kinesisClient = null;
}
+
if (dynamoDbClient != null) {
dynamoDbClient.close();
dynamoDbClient = null;
}
- if (cloudWatchClient != null) {
- cloudWatchClient.close();
- cloudWatchClient = null;
- }
- recordBuffer = null;
- readerRecordProcessor = null;
- demarcatorValue = null;
+ closeQuietly(kinesisHttpClient);
+ kinesisHttpClient = null;
+ closeQuietly(dynamoHttpClient);
+ dynamoHttpClient = null;
}
- private void shutdownScheduler() {
- if (kinesisScheduler.shutdownComplete()) {
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final int clusterMemberCount = Math.max(1,
getNodeTypeProvider().getClusterMembers().size());
+ shardManager.refreshLeasesIfNecessary(clusterMemberCount);
+ final List<Shard> ownedShards = shardManager.getOwnedShards();
+
+ if (ownedShards.isEmpty()) {
+ context.yield();
return;
}
- final long start = System.nanoTime();
- getLogger().debug("Shutting down Kinesis Scheduler");
+ final Set<String> ownedShardIds = new HashSet<>();
+ for (final Shard shard : ownedShards) {
+ ownedShardIds.add(shard.shardId());
+ }
+
+ consumerClient.removeUnownedShards(ownedShardIds);
+ consumerClient.startFetches(ownedShards, streamName,
maxRecordsPerRequest, initialStreamPosition, shardManager);
+ consumerClient.logDiagnostics(ownedShards.size(),
shardManager.getCachedShardCount());
+
+ final Set<String> claimedShards = new HashSet<>();
+ final List<ShardFetchResult> consumed = consumeRecords(claimedShards);
+ final List<ShardFetchResult> accepted =
discardRelinquishedResults(consumed, claimedShards);
+
+ if (accepted.isEmpty()) {
+ consumerClient.releaseShards(claimedShards);
+ context.yield();
+ return;
+ }
- boolean gracefulShutdownSucceeded;
+ final PartitionedBatch batch = partitionByShardAndCheckpoint(accepted);
+
+ final WriteResult output;
try {
- gracefulShutdownSucceeded =
kinesisScheduler.startGracefulShutdown().get(KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(),
SECONDS);
- if (!gracefulShutdownSucceeded) {
- getLogger().warn("Failed to shutdown Kinesis Scheduler
gracefully. See the logs for more details");
- }
- } catch (final RuntimeException | InterruptedException |
ExecutionException | TimeoutException e) {
- if (e instanceof TimeoutException) {
- getLogger().warn("Failed to shutdown Kinesis Scheduler
gracefully after {} seconds",
KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), e);
- } else {
- getLogger().warn("Failed to shutdown Kinesis Scheduler
gracefully", e);
- }
- gracefulShutdownSucceeded = false;
+ output = writeResults(session, context, batch.resultsByShard());
+ } catch (final Exception e) {
+ handleWriteFailure(e, accepted, claimedShards, context);
+ return;
}
- if (!gracefulShutdownSucceeded) {
- kinesisScheduler.shutdown();
+ if (output.produced().isEmpty() && output.parseFailures().isEmpty()) {
+ consumerClient.releaseShards(claimedShards);
+ context.yield();
+ return;
}
- final long finish = System.nanoTime();
- getLogger().debug("Kinesis Scheduler shutdown finished after {}
seconds", NANOSECONDS.toSeconds(finish - start));
+ session.transfer(output.produced(), REL_SUCCESS);
+ if (!output.parseFailures().isEmpty()) {
+ session.transfer(output.parseFailures(), REL_PARSE_FAILURE);
+ session.adjustCounter("Records Parse Failure",
output.parseFailures().size(), false);
+ }
+ session.adjustCounter("Records Consumed", output.totalRecordCount(),
false);
+ final long dedupEvents = consumerClient.drainDeduplicatedEventCount();
+ if (dedupEvents > 0) {
+ session.adjustCounter("EFO Deduplicated Events", dedupEvents,
false);
+ }
+
+ session.commitAsync(
+ () -> {
+ try {
+ shardManager.writeCheckpoints(batch.checkpoints());
+ consumerClient.acknowledgeResults(accepted);
Review Comment:
If `writeCheckpoints` fails, we don't `acknowledgeResults` in this callback.
It seems efo consumer will be stuck in that situation, as we request next
records in the acknowledgement. Shall we swap these operations? Or have a
ladder with `try {} finally {}` statement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]