KKcorps commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861185288
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
.credentialsProvider(getLocalAWSCredentials(props))
.httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
} else {
- kinesisClientBuilder =
-
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
- .credentialsProvider(DefaultCredentialsProvider.create())
- .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
+ kinesisClientBuilder =
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
}
if (props.containsKey(ENDPOINT)) {
String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
try {
kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new
URI(kinesisEndpoint));
} catch (URISyntaxException e) {
- throw new IllegalArgumentException("URI syntax is not correctly
specified for endpoint: "
- + kinesisEndpoint, e);
+ throw new IllegalArgumentException("URI syntax is not correctly
specified for endpoint: " + kinesisEndpoint,
+ e);
}
}
_kinesisClient = kinesisClientBuilder.build();
+
+ int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES,
DEFAULT_NUM_RETRIES));
+ long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS,
DEFAULT_RETRY_DELAY_MILLIS));
+ _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
} catch (Exception e) {
_kinesisClient = null;
}
}
@Override
public void produce(String topic, byte[] payload) {
- PutRecordRequest putRecordRequest =
-
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
- .partitionKey(UUID.randomUUID().toString()).build();
- PutRecordResponse putRecordResponse =
_kinesisClient.putRecord(putRecordRequest);
+ try {
+ _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+ } catch (Exception e) {
+ LOGGER.error("Retries exhausted while pushing record in stream {}",
topic);
+ }
}
@Override
public void produce(String topic, byte[] key, byte[] payload) {
- PutRecordRequest putRecordRequest =
-
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new
String(key))
- .build();
- PutRecordResponse putRecordResponse =
_kinesisClient.putRecord(putRecordRequest);
+ try {
+ _retryPolicy.attempt(() -> putRecord(topic, key, payload));
Review Comment:
Yep, the retryPolicy.attempt throws the exception. It is available in
`pinot-spi` `org.apache.pinot.spi.utils.retry.RetryPolicy`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]