This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new c35d4a6 Consumer tweaks to get it working
c35d4a6 is described below
commit c35d4a603359d8034da92f93a8b6e6e000ea542c
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Jan 4 18:01:52 2021 -0800
Consumer tweaks to get it working
---
.../protocols/SegmentCompletionProtocol.java | 19 ++++++++++++----
.../plugin/stream/kinesis/KinesisConsumer.java | 7 ++++--
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 16 ++++++--------
.../org/apache/pinot/spi/stream/MessageBatch.java | 2 ++
.../pinot/spi/stream/PartitionGroupMetadata.java | 25 +++++++++++-----------
5 files changed, 41 insertions(+), 28 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 04f300b..dd1330d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -180,6 +183,15 @@ public class SegmentCompletionProtocol {
}
public String getUrl(String hostPort, String protocol) {
+ String streamPartitionMsgOffset;
+ try {
+ streamPartitionMsgOffset = _params.getStreamPartitionMsgOffset() ==
null ? null :
+ URLEncoder.encode(_params.getStreamPartitionMsgOffset(),
StandardCharsets.UTF_8.toString());
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalStateException(
+ "Caught exception when encoding streamPartitionMsgOffset string: "
+ _params.getStreamPartitionMsgOffset(),
+ e);
+ }
return protocol + "://" + hostPort + "/" + _msgType + "?" +
PARAM_SEGMENT_NAME + "=" + _params.getSegmentName()
+ "&" + PARAM_OFFSET + "=" + _params.getOffset() + "&" +
PARAM_INSTANCE_ID + "=" + _params.getInstanceId() + (
_params.getReason() == null ? "" : ("&" + PARAM_REASON + "=" +
_params.getReason())) + (
@@ -190,10 +202,9 @@ public class SegmentCompletionProtocol {
+ (_params.getSegmentSizeBytes() <= 0 ? ""
: ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" +
_params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? ""
: ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) +
(_params.getSegmentLocation() == null ? ""
- : ("&" + PARAM_SEGMENT_LOCATION + "=" +
_params.getSegmentLocation()))
- + (_params.getStreamPartitionMsgOffset() == null ? ""
- : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" +
_params.getStreamPartitionMsgOffset()))
- ;
+ : ("&" + PARAM_SEGMENT_LOCATION + "=" +
_params.getSegmentLocation())) + (
+ streamPartitionMsgOffset == null ? ""
+ : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" +
streamPartitionMsgOffset));
}
public static class Params {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 8ed3de7..a97f3dc 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -59,13 +59,13 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
}
@Override
- public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end,
int timeout) {
+ public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end,
int timeoutMs) {
List<Record> recordList = new ArrayList<>();
Future<KinesisRecordsBatch> kinesisFetchResultFuture =
_executorService.submit(() -> getResult(start, end, recordList));
try {
- return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
+ return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
return handleException((KinesisCheckpoint) start, recordList);
}
@@ -127,6 +127,9 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
}
return new KinesisRecordsBatch(recordList, next.getKey());
+ } catch (IllegalStateException e) {
+ LOG.warn("Illegal state exception, connection is broken", e);
+ return handleException(kinesisStartCheckpoint, recordList);
} catch (ProvisionedThroughputExceededException e) {
LOG.warn("The request rate for the stream is too high", e);
return handleException(kinesisStartCheckpoint, recordList);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index fb4bfb3..fdc883b 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -18,9 +18,11 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -28,8 +30,8 @@ import software.amazon.awssdk.services.kinesis.model.Record;
public class KinesisRecordsBatch implements MessageBatch<byte[]> {
- private List<Record> _recordList;
- private String _shardId;
+ private final List<Record> _recordList;
+ private final String _shardId;
public KinesisRecordsBatch(List<Record> recordList, String shardId) {
_recordList = recordList;
@@ -43,12 +45,11 @@ public class KinesisRecordsBatch implements
MessageBatch<byte[]> {
@Override
public byte[] getMessageAtIndex(int index) {
- return _recordList.get(index).data().asByteBuffer().array();
+ return _recordList.get(index).data().asByteArray();
}
-
@Override
public int getMessageOffsetAtIndex(int index) {
- return _recordList.get(index).data().asByteBuffer().arrayOffset();
+ return
ByteBuffer.wrap(_recordList.get(index).data().asByteArray()).arrayOffset();
}
@Override
@@ -57,11 +58,6 @@ public class KinesisRecordsBatch implements
MessageBatch<byte[]> {
}
@Override
- public RowMetadata getMetadataAtIndex(int index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int
index) {
Map<String, String> shardToSequenceMap = new HashMap<>();
shardToSequenceMap.put(_shardId, _recordList.get(index).sequenceNumber());
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 3052b9e..5af72c0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.stream;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -61,6 +62,7 @@ public interface MessageBatch<T> {
* Returns the metadata associated with the message at a particular index.
This typically includes the timestamp
* when the message was ingested by the upstream stream-provider and other
relevant metadata.
*/
+ @Nullable
default RowMetadata getMetadataAtIndex(int index) {
return null;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index 7c4e3ef..aaf20b6 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -36,18 +36,7 @@ public class PartitionGroupMetadata {
_sequenceNumber = sequenceNumber;
_startCheckpoint = startCheckpoint;
_endCheckpoint = endCheckpoint;
- }
-
- public void setSequenceNumber(int sequenceNumber) {
- _sequenceNumber = sequenceNumber;
- }
-
- public void setStartCheckpoint(String startCheckpoint) {
- _startCheckpoint = startCheckpoint;
- }
-
- public void setEndCheckpoint(String endCheckpoint) {
- _endCheckpoint = endCheckpoint;
+ _status = status;
}
public int getPartitionGroupId() {
@@ -58,14 +47,26 @@ public class PartitionGroupMetadata {
return _sequenceNumber;
}
+ public void setSequenceNumber(int sequenceNumber) {
+ _sequenceNumber = sequenceNumber;
+ }
+
public String getStartCheckpoint() {
return _startCheckpoint;
}
+ public void setStartCheckpoint(String startCheckpoint) {
+ _startCheckpoint = startCheckpoint;
+ }
+
public String getEndCheckpoint() {
return _endCheckpoint;
}
+ public void setEndCheckpoint(String endCheckpoint) {
+ _endCheckpoint = endCheckpoint;
+ }
+
public String getStatus() {
return _status;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]