This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 90e821c3b4 Enable kinesis tests (#15564)
90e821c3b4 is described below
commit 90e821c3b46c01fd7df4e1fabfde6892326f4ae2
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Apr 17 05:41:49 2025 +0530
Enable kinesis tests (#15564)
---
.../integration/tests/RealtimeKinesisIntegrationTest.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
index e2b49de5c8..1e33a0b7fe 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
@@ -82,14 +82,13 @@ import
software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.AttributeMap;
-@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag =
"0.12.15")
+@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag =
"2.3.2")
public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
private static final LocalstackDockerAnnotationProcessor PROCESSOR = new
LocalstackDockerAnnotationProcessor();
private static final String STREAM_NAME = "kinesis-test";
private static final String STREAM_TYPE = "kinesis";
- public static final int MAX_RECORDS_TO_FETCH = Integer.MAX_VALUE;
public static final String REGION = "us-east-1";
public static final String LOCALSTACK_KINESIS_ENDPOINT =
"http://localhost:4566";
@@ -110,7 +109,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
private boolean _skipTestNoDockerInstalled = false;
- @BeforeClass(enabled = false)
+ @BeforeClass
public void setUp()
throws Exception {
try {
@@ -122,7 +121,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
throw new SkipException(e.getMessage());
}
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
@@ -199,7 +198,6 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_DECODER_CLASS),
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
streamConfigMap.put(KinesisConfig.REGION, REGION);
- streamConfigMap.put(KinesisConfig.MAX_RECORDS_TO_FETCH,
String.valueOf(MAX_RECORDS_TO_FETCH));
streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT);
streamConfigMap.put(KinesisConfig.ACCESS_KEY,
getLocalAWSCredentials().resolveCredentials().accessKeyId());
@@ -309,7 +307,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
return
StaticCredentialsProvider.create(AwsBasicCredentials.create("access",
"secret"));
}
- @Test(enabled = false)
+ @Test
public void testRecords()
throws Exception {
Assert.assertNotEquals(_totalRecordsPushedInStream, 0);
@@ -371,7 +369,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
}
}
- @Test(enabled = false)
+ @Test
public void testCountRecords() {
long count = getPinotConnection().execute("SELECT COUNT(*) FROM " +
getTableName()).getResultSet(0).getLong(0);
Assert.assertEquals(count, _totalRecordsPushedInStream);
@@ -434,7 +432,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
_h2FieldNameAndTypes.toArray(new String[_h2FieldNameAndTypes.size()]))
+ ")").execute();
}
- @AfterClass(enabled = false)
+ @AfterClass
public void tearDown()
throws Exception {
if (_skipTestNoDockerInstalled) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]