J-HowHuang opened a new issue, #15378:
URL: https://github.com/apache/pinot/issues/15378
# How to reproduce
```bash
aws kinesis create-stream --stream-name transcript-stream --shard-count 2
aws kinesis list-shards --stream-name transcript-stream
```
output:
```json
{
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber":
"49661727849342039258072939847765243280241975881076047874"
}
},
{
"ShardId": "shardId-000000000001",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber":
"49661727849364340003271470470906778998514624242582028306"
}
}
]
}
```
Create a table with schema (using `pinot-admin.sh AddTable` here):
```json
{
"REALTIME": {
"tableName": "transcript_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"schemaName": "transcript",
"replicasPerPartition": "1",
"timeType": "MILLISECONDS",
"minimizeDataMovement": false,
"timeColumnName": "timestamp"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"streamConfigs": {
"streamType": "kinesis",
"stream.kinesis.topic.name": "transcript-stream",
"region": "us-east-1",
"shardIteratorType": "AFTER_SEQUENCE_NUMBER",
"stream.kinesis.consumer.type": "lowlevel",
"stream.kinesis.fetch.timeout.millis": "120000",
"stream.kinesis.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kinesis.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
"realtime.segment.flush.threshold.rows": "10",
"realtime.segment.flush.threshold.time": "6h"
},
"aggregateMetrics": false,
"enableDefaultStarTree": false,
"nullHandlingEnabled": false,
"rangeIndexVersion": 2,
"optimizeDictionary": false,
"loadMode": "MMAP",
"optimizeDictionaryType": false,
"skipSegmentPreprocess": false,
"optimizeDictionaryForMetrics": false,
"noDictionarySizeRatioThreshold": 0.85,
"enableDynamicStarTreeCreation": false,
"autoGeneratedInvertedIndex": false,
"columnMajorSegmentBuilderEnabled": true,
"createInvertedIndexDuringSegmentGeneration": false
},
"metadata": {
"customConfigs": {}
},
"isDimTable": false
}
}
```
Run
```bash
curl -X 'GET' \
'http://localhost:9000/tables/transcript_REALTIME/consumingSegmentsInfo' \
-H 'accept: application/json'
```
Response:
```json
{
"serversFailingToRespond": 2,
"serversUnparsableRespond": 0,
"_segmentToConsumingInfoMap": {
"transcript__0__0__20250326T1827Z": [],
"transcript__1__0__20250326T1827Z": []
}
}
```
Server log:
```
2025/03/26 11:27:55.710 WARN
[RealtimeSegmentDataManager_transcript__1__0__20250326T1827Z]
[grizzly-http-server-11] Cannot fetch stream offset with criteria
OffsetCriteria{_offsetType=LARGEST, _offsetString='largest'} for clientId
transcript_REALTIME-transcript-stream-1 and partitionGroupId 1 with maxWaitTime
5000
java.lang.RuntimeException: Failed to find shard for partitionId: 1
at
org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.lambda$fetchStreamPartitionOffset$1(KinesisStreamMetadataProvider.java:105)
~[classes/:?]
at java.base/java.util.Optional.orElseThrow(Optional.java:403) ~[?:?]
at
org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.fetchStreamPartitionOffset(KinesisStreamMetadataProvider.java:105)
~[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchStreamOffset(RealtimeSegmentDataManager.java:1790)
~[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchLatestStreamOffset(RealtimeSegmentDataManager.java:1765)
~[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.fetchLatestStreamOffset(RealtimeSegmentDataManager.java:1770)
~[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.getConsumerPartitionState(RealtimeSegmentDataManager.java:1010)
~[classes/:?]
at
org.apache.pinot.server.api.resources.TablesResource.getConsumingSegmentsInfo(TablesResource.java:1058)
~[classes/:?]
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
~[?:?]
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
~[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256)
[jersey-server-2.45.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
[jersey-common-2.45.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
[jersey-common-2.45.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
[jersey-common-2.45.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
[jersey-common-2.45.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
[jersey-common-2.45.jar:?]
at
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
[jersey-common-2.45.jar:?]
at
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
[jersey-server-2.45.jar:?]
at
org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer.service(GrizzlyHttpContainer.java:356)
[jersey-container-grizzly2-http-2.45.jar:?]
at
org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:200)
[grizzly-http-server-2.4.4.jar:2.4.4]
at
org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:569)
[grizzly-framework-2.4.4.jar:2.4.4]
at
org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:549)
[grizzly-framework-2.4.4.jar:2.4.4]
at java.base/java.lang.Thread.run(Thread.java:842) [?:?]
```
# Suggestion
It seems like the issue roots from the mis-alignment of default Kinesis
shard naming (e.g. `shardId-000000000000` in this example), and Pinot partition.
The API works after making this change:
https://github.com/apache/pinot/blob/07785bf8cff073791492f2cb1667e2d12709703b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java#L54
to
```java
public static final String SHARD_ID_PREFIX = "shardId-00000000000";
```
API response after the change (the change only made to one server)
```json
{
"serversFailingToRespond": 1,
"serversUnparsableRespond": 0,
"_segmentToConsumingInfoMap": {
"transcript__0__0__20250326T1827Z": [],
"transcript__1__0__20250326T1827Z": [
{
"serverName": "Server_100.114.242.49_8098",
"consumerState": "CONSUMING",
"lastConsumedTimestamp": -1,
"partitionToOffsetMap": {
"1":
"{\"shardId-000000000001\":\"49661785334262847198041109885226199023478929735110623250\"}"
},
"partitionOffsetInfo": {
"currentOffsetsMap": {
"1":
"{\"shardId-000000000001\":\"49661785334262847198041109885226199023478929735110623250\"}"
},
"latestUpstreamOffsetMap": {
"1": "{\"shardId-000000000001\":null}"
},
"recordsLagMap": {
"1": "NOT_CALCULATED"
},
"availabilityLagMsMap": {
"1": "UNKNOWN"
}
}
}
]
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]