markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907039969


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
     private static final Set<Relationship> RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
     private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-    private volatile DynamoDbAsyncClient dynamoDbClient;
-    private volatile CloudWatchAsyncClient cloudWatchClient;
-    private volatile KinesisAsyncClient kinesisClient;
-    private volatile Scheduler kinesisScheduler;
-
+    private volatile SdkHttpClient kinesisHttpClient;
+    private volatile SdkHttpClient dynamoHttpClient;
+    private volatile KinesisClient kinesisClient;
+    private volatile DynamoDbClient dynamoDbClient;
+    private volatile SdkAsyncHttpClient asyncHttpClient;
+    private volatile KinesisShardManager shardManager;
+    private volatile KinesisConsumerClient consumerClient;
     private volatile String streamName;
-    private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
-    private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-    private volatile @Nullable byte[] demarcatorValue;
+    private volatile int maxRecordsPerRequest;
+    private volatile String initialStreamPosition;
+    private volatile long maxBatchNanos;
+    private volatile long maxBatchBytes;
 
-    private volatile Future<InitializationResult> initializationResultFuture;
-    private final AtomicBoolean initialized = new AtomicBoolean();
-
-    // An instance filed, so that it can be read in getRelationships.
-    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-            PROCESSING_STRATEGY.getDefaultValue());
+    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTY_DESCRIPTORS;
     }
 
-    @Override
-    public void migrateProperties(final PropertyConfiguration config) {
-        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-    }
-
     @Override
     public Set<Relationship> getRelationships() {
         return switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+            case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
             case RECORD -> RECORD_FILE_RELATIONSHIPS;
         };
     }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (descriptor.equals(PROCESSING_STRATEGY)) {
-            processingStrategy = ProcessingStrategy.from(newValue);
+            processingStrategy = ProcessingStrategy.valueOf(newValue);
         }
     }
 
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        readerRecordProcessor = switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> null;
-            case RECORD -> createReaderRecordProcessor(context);
-        };
-        demarcatorValue = switch (processingStrategy) {
-            case FLOW_FILE, RECORD -> null;
-            case DEMARCATOR -> {
-                final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-                yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-            }
-        };
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+        config.removeProperty("Checkpoint Interval");
+        config.removeProperty("Metrics Publishing");
+    }
+
+    @Override
+    public void migrateRelationships(final RelationshipConfiguration config) {
+        config.renameRelationship("parse failure", "parse.failure");
+    }
 
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
         final Region region = RegionUtil.getRegion(context);
         final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
                 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+        final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-        kinesisClient = KinesisAsyncClient.builder()
-                .region(region)
-                .credentialsProvider(credentialsProvider)
-                .endpointOverride(getKinesisEndpointOverride())
-                .httpClient(createKinesisHttpClient(context))
+        final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+                .apiCallTimeout(API_CALL_TIMEOUT)
+                .apiCallAttemptTimeout(API_CALL_ATTEMPT_TIMEOUT)
                 .build();
 
-        dynamoDbClient = DynamoDbAsyncClient.builder()
+        final KinesisClientBuilder kinesisBuilder = KinesisClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getDynamoDbEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        cloudWatchClient = CloudWatchAsyncClient.builder()
+        final DynamoDbClientBuilder dynamoBuilder = DynamoDbClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getCloudwatchEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
-
-        streamName = context.getProperty(STREAM_NAME).getValue();
-        final InitialPositionInStreamExtended initialPositionExtended = 
getInitialPosition(context);
-        final SingleStreamTracker streamTracker = new 
SingleStreamTracker(streamName, initialPositionExtended);
-
-        final long maxBytesToBuffer = 
context.getProperty(MAX_BYTES_TO_BUFFER).asDataSize(DataUnit.B).longValue();
-        final Duration checkpointInterval = 
context.getProperty(CHECKPOINT_INTERVAL).asDuration();
-        final MemoryBoundRecordBuffer memoryBoundRecordBuffer = new 
MemoryBoundRecordBuffer(getLogger(), maxBytesToBuffer, checkpointInterval);
-        recordBuffer = memoryBoundRecordBuffer;
-        final ShardRecordProcessorFactory recordProcessorFactory = () -> new 
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
-
-        final String applicationName = 
context.getProperty(APPLICATION_NAME).getValue();
-        final String workerId = generateWorkerId();
-        final ConfigsBuilder configsBuilder = new 
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, 
cloudWatchClient, workerId, recordProcessorFactory);
-
-        final MetricsFactory metricsFactory = configureMetricsFactory(context);
-        final RetrievalSpecificConfig retrievalSpecificConfig = 
configureRetrievalSpecificConfig(context, kinesisClient, streamName, 
applicationName);
-
-        final InitializationStateChangeListener initializationListener = new 
InitializationStateChangeListener(getLogger());
-        initialized.set(false);
-        initializationResultFuture = initializationListener.result();
-
-        kinesisScheduler = new Scheduler(
-                configsBuilder.checkpointConfig(),
-                
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
-                configsBuilder.leaseManagementConfig(),
-                configsBuilder.lifecycleConfig(),
-                configsBuilder.metricsConfig().metricsFactory(metricsFactory),
-                configsBuilder.processorConfig(),
-                
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
-        );
-
-        final String schedulerThreadName = 
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
-        final Thread schedulerThread = new Thread(kinesisScheduler, 
schedulerThreadName);
-        schedulerThread.setDaemon(true);
-        schedulerThread.start();
-        // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
+                .overrideConfiguration(clientConfig);
 
-        try {
-            final InitializationResult result = initializationResultFuture.get(
-                    
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
-            checkInitializationResult(result);
-        } catch (final TimeoutException e) {
-            // During a first run the processor will take more time to 
initialize. We return from OnSchedule and continue waiting in the onTrigger 
method.
-            getLogger().warn("Kinesis Scheduler initialization may take up to 
10 minutes on a first run, which is caused by AWS resources initialization");
-        } catch (final InterruptedException | ExecutionException e) {
-            if (e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-            cleanUpState();
-            throw new ProcessException("Initialization failed for stream 
[%s]".formatted(streamName), e);
+        if (endpointOverride != null && !endpointOverride.isEmpty()) {
+            final URI endpointUri = URI.create(endpointOverride);
+            kinesisBuilder.endpointOverride(endpointUri);
+            dynamoBuilder.endpointOverride(endpointUri);
         }
-    }
 
-    /**
-     * Creating Kinesis HTTP client, as per
-     * {@link 
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
-     */
-    private static SdkAsyncHttpClient createKinesisHttpClient(final 
ProcessContext context) {
-        return createHttpClientBuilder(context)
-                .protocol(Protocol.HTTP2)
-                // Since we're using HTTP/2, multiple concurrent requests will 
reuse the same HTTP connection.
-                // Therefore, the number of real connections is going to be 
relatively small.
-                .maxConcurrency(Integer.MAX_VALUE)
-                .http2Configuration(Http2Configuration.builder()
-                        
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
-                        
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)
-                        .build())
-                .build();
-    }
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
 
-    private static NettyNioAsyncHttpClient.Builder 
createHttpClientBuilder(final ProcessContext context) {
-        final NettyNioAsyncHttpClient.Builder builder = 
NettyNioAsyncHttpClient.builder()
-                .connectionTimeout(HTTP_CLIENTS_CONNECTION_TIMEOUT)
-                .readTimeout(HTTP_CLIENTS_READ_TIMEOUT);
+        kinesisHttpClient = buildApacheHttpClient(proxyConfig, 
PollingKinesisClient.MAX_CONCURRENT_FETCHES + 10);
+        dynamoHttpClient = buildApacheHttpClient(proxyConfig, 50);
+        kinesisBuilder.httpClient(kinesisHttpClient);
+        dynamoBuilder.httpClient(dynamoHttpClient);
+
+        kinesisClient = kinesisBuilder.build();
+        dynamoDbClient = dynamoBuilder.build();
+
+        final String checkpointTableName = 
context.getProperty(APPLICATION_NAME).getValue();
+        streamName = context.getProperty(STREAM_NAME).getValue();
+        maxRecordsPerRequest = 
context.getProperty(MAX_RECORDS_PER_REQUEST).asInteger();
+        initialStreamPosition = 
context.getProperty(INITIAL_STREAM_POSITION).getValue();
+        maxBatchNanos = 
context.getProperty(MAX_BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+        maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B).longValue();
+
+        shardManager = createShardManager(kinesisClient, dynamoDbClient, 
getLogger(), checkpointTableName, streamName);
+        shardManager.ensureCheckpointTableExists();
+
+        final boolean efoMode = 
ConsumerType.ENHANCED_FAN_OUT.equals(context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class));
+        consumerClient = createConsumerClient(kinesisClient, getLogger(), 
efoMode);
+
+        final Instant timestampForPosition = resolveTimestampPosition(context);
+        if (timestampForPosition != null) {
+            if (consumerClient instanceof PollingKinesisClient polling) {
+                polling.setTimestampForInitialPosition(timestampForPosition);
+            } else if (consumerClient instanceof EfoKinesisClient efo) {
+                efo.setTimestampForInitialPosition(timestampForPosition);
+            }
+        }
 
-        final ProxyConfigurationService proxyConfigService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
-        if (proxyConfigService != null) {
-            final ProxyConfiguration proxyConfig = 
proxyConfigService.getConfiguration();
+        if (efoMode) {
+            final NettyNioAsyncHttpClient.Builder nettyBuilder = 
NettyNioAsyncHttpClient.builder()
+                    .maxConcurrency(500)
+                    .connectionAcquisitionTimeout(Duration.ofSeconds(60));

Review Comment:
   It does make sense to explicitly set it here. It makes less sense for 
polling consumers. Polling consumers don't really benefit from HTTP/2 and it 
can cause issues for a lot of enterprise proxies, etc. Plus, it would require 
changing to Netty in all places, which adds further complexity. So will use it 
here only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to