This is an automated email from the ASF dual-hosted git repository. jihoonson pushed a commit to branch kinesis-compatibility in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 1352ee1dbf69aa462fd2ac308774dd12bdc688a1 Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Tue Mar 26 10:50:16 2019 -0700 Support kinesis compatibility --- .../indexing/kafka/KafkaIndexTaskIOConfig.java | 8 +- .../indexing/kinesis/KinesisIndexTaskIOConfig.java | 117 +++++++++- .../indexing/kinesis/KinesisIOConfigTest.java | 238 +++++++++++++++++++++ .../indexing/kinesis/KinesisIndexTaskTest.java | 60 +++--- .../kinesis/supervisor/KinesisSupervisorTest.java | 2 +- 5 files changed, 386 insertions(+), 39 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 2200e47..37366f3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -41,8 +41,10 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility @JsonProperty("baseSequenceName") String baseSequenceName, // startPartitions and endPartitions exist to be able to read old ioConfigs in metadata store - @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions, - @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions, + @JsonProperty("startPartitions") @Nullable + @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions, + @JsonProperty("endPartitions") @Nullable + @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions, // startSequenceNumbers and endSequenceNumbers must be set for new versions @JsonProperty("startSequenceNumbers") @Nullable SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers, @@ -115,6 +117,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte * {@link SeekableStreamStartSequenceNumbers} didn't exist before. */ @JsonProperty + @Deprecated public SeekableStreamEndSequenceNumbers<Integer, Long> getStartPartitions() { // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive. @@ -130,6 +133,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte * old version of Druid. */ @JsonProperty + @Deprecated public SeekableStreamEndSequenceNumbers<Integer, Long> getEndPartitions() { return getEndSequenceNumbers(); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index f312dd6..e726ae2 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe import org.joda.time.DateTime; import javax.annotation.Nullable; +import java.util.Set; public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String> { @@ -46,6 +47,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St public KinesisIndexTaskIOConfig( @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, @JsonProperty("baseSequenceName") String baseSequenceName, + // below three deprecated variables exist to be able to read old ioConfigs in metadata store + @JsonProperty("startPartitions") + @Nullable + @Deprecated SeekableStreamEndSequenceNumbers<String, String> startPartitions, + @JsonProperty("endPartitions") + @Nullable + @Deprecated SeekableStreamEndSequenceNumbers<String, String> endPartitions, + @JsonProperty("exclusiveStartSequenceNumberPartitions") + @Nullable + @Deprecated Set<String> exclusiveStartSequenceNumberPartitions, + // startSequenceNumbers and endSequenceNumbers must be set for new versions @JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers, @JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers, @JsonProperty("useTransaction") Boolean useTransaction, @@ -62,17 +74,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St super( taskGroupId, baseSequenceName, - startSequenceNumbers, - endSequenceNumbers, + getStartSequenceNumbers(startSequenceNumbers, startPartitions, exclusiveStartSequenceNumberPartitions), + endSequenceNumbers == null ? endPartitions : endSequenceNumbers, useTransaction, minimumMessageTime, maximumMessageTime ); Preconditions.checkArgument( - endSequenceNumbers.getPartitionSequenceNumberMap() - .values() - .stream() - .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)), + getEndSequenceNumbers().getPartitionSequenceNumberMap() + .values() + .stream() + .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)), "End sequenceNumbers must not have the end of shard marker (EOS)" ); @@ -84,6 +96,99 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St this.deaggregate = deaggregate; } + public KinesisIndexTaskIOConfig( + int taskGroupId, + String baseSequenceName, + SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers, + SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers, + Boolean useTransaction, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + String endpoint, + Integer recordsPerFetch, + Integer fetchDelayMillis, + String awsAssumedRoleArn, + String awsExternalId, + boolean deaggregate + ) + { + this( + taskGroupId, + baseSequenceName, + null, + null, + null, + startSequenceNumbers, + endSequenceNumbers, + useTransaction, + minimumMessageTime, + maximumMessageTime, + endpoint, + recordsPerFetch, + fetchDelayMillis, + awsAssumedRoleArn, + awsExternalId, + deaggregate + ); + } + + private static SeekableStreamStartSequenceNumbers<String, String> getStartSequenceNumbers( + @Nullable SeekableStreamStartSequenceNumbers<String, String> newStartSequenceNumbers, + @Nullable SeekableStreamEndSequenceNumbers<String, String> oldStartSequenceNumbers, + @Nullable Set<String> exclusiveStartSequenceNumberPartitions + ) + { + if (newStartSequenceNumbers == null) { + Preconditions.checkNotNull( + oldStartSequenceNumbers, + "Either startSequenceNumbers or startPartitions shoulnd't be null" + ); + + return new SeekableStreamStartSequenceNumbers<>( + oldStartSequenceNumbers.getStream(), + oldStartSequenceNumbers.getPartitionSequenceNumberMap(), + exclusiveStartSequenceNumberPartitions + ); + } else { + return newStartSequenceNumbers; + } + } + + /** + * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by + * old version of Druid. Note that this method returns end sequence numbers instead of start. This is because + * {@link SeekableStreamStartSequenceNumbers} didn't exist before. + */ + @JsonProperty + @Deprecated + public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions() + { + // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive. + final SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers = getStartSequenceNumbers(); + return new SeekableStreamEndSequenceNumbers<>( + startSequenceNumbers.getStream(), + startSequenceNumbers.getPartitionSequenceNumberMap() + ); + } + + /** + * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by + * old version of Druid. + */ + @JsonProperty + @Deprecated + public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions() + { + return getEndSequenceNumbers(); + } + + @JsonProperty + @Deprecated + public Set<String> getExclusiveStartSequenceNumberPartitions() + { + return getStartSequenceNumbers().getExclusivePartitions(); + } + @JsonProperty public String getEndpoint() { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index e0c7790..22393a8 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -19,21 +19,31 @@ package org.apache.druid.indexing.kinesis; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.indexing.IOConfig; import org.hamcrest.CoreMatchers; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; +import java.io.IOException; import java.util.Collections; +import java.util.Set; public class KinesisIOConfigTest { @@ -243,4 +253,232 @@ public class KinesisIOConfigTest exception.expectMessage(CoreMatchers.containsString("endpoint")); mapper.readValue(jsonStr, IOConfig.class); } + + @Test + public void testDeserializeToOldIoConfig() throws IOException + { + final KinesisIndexTaskIOConfig currentConfig = new KinesisIndexTaskIOConfig( + 0, + "baseSequenceName", + new SeekableStreamStartSequenceNumbers<>( + "stream", + ImmutableMap.of("1", "10L", "2", "5L"), + ImmutableSet.of("1") + ), + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")), + true, + DateTimes.nowUtc(), + DateTimes.nowUtc(), + "endpoint", + 1000, + 2000, + "awsAssumedRoleArn", + "awsExternalId", + true + ); + + final byte[] json = mapper.writeValueAsBytes(currentConfig); + final ObjectMapper oldMapper = new DefaultObjectMapper(); + oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis")); + + final OldKinesisIndexTaskIoConfig oldConfig = (OldKinesisIndexTaskIoConfig) oldMapper.readValue( + json, + IOConfig.class + ); + + Assert.assertEquals(currentConfig.getBaseSequenceName(), oldConfig.getBaseSequenceName()); + Assert.assertEquals( + currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(), + oldConfig.getStartPartitions().getPartitionSequenceNumberMap() + ); + Assert.assertEquals( + currentConfig.getStartSequenceNumbers().getExclusivePartitions(), + oldConfig.getExclusiveStartSequenceNumberPartitions() + ); + Assert.assertEquals(currentConfig.getEndSequenceNumbers(), oldConfig.getEndPartitions()); + Assert.assertEquals(currentConfig.isUseTransaction(), oldConfig.isUseTransaction()); + Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime()); + Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime()); + Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint()); + Assert.assertEquals(currentConfig.getRecordsPerFetch(), oldConfig.getRecordsPerFetch()); + Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis()); + Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn()); + Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId()); + Assert.assertEquals(currentConfig.isDeaggregate(), oldConfig.isDeaggregate()); + } + + @Test + public void testDeserializeFromOldIoConfig() throws IOException + { + final ObjectMapper oldMapper = new DefaultObjectMapper(); + oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis")); + + final OldKinesisIndexTaskIoConfig oldConfig = new OldKinesisIndexTaskIoConfig( + "baseSequenceName", + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "10L", "2", "5L")), + new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")), + ImmutableSet.of("1"), + true, + DateTimes.nowUtc(), + DateTimes.nowUtc(), + "endpoint", + 1000, + 2000, + "awsAssumedRoleArn", + "awsExternalId", + true + ); + + final byte[] json = oldMapper.writeValueAsBytes(oldConfig); + final KinesisIndexTaskIOConfig currentConfig = (KinesisIndexTaskIOConfig) mapper.readValue(json, IOConfig.class); + + Assert.assertNull(currentConfig.getTaskGroupId()); + Assert.assertEquals(oldConfig.getBaseSequenceName(), currentConfig.getBaseSequenceName()); + Assert.assertEquals( + oldConfig.getStartPartitions().getPartitionSequenceNumberMap(), + currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap() + ); + Assert.assertEquals( + oldConfig.getExclusiveStartSequenceNumberPartitions(), + currentConfig.getStartSequenceNumbers().getExclusivePartitions() + ); + Assert.assertEquals(oldConfig.getEndPartitions(), currentConfig.getEndSequenceNumbers()); + Assert.assertEquals(oldConfig.isUseTransaction(), currentConfig.isUseTransaction()); + Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime()); + Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime()); + Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint()); + Assert.assertEquals(oldConfig.getRecordsPerFetch(), currentConfig.getRecordsPerFetch()); + Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis()); + Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn()); + Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId()); + Assert.assertEquals(oldConfig.isDeaggregate(), currentConfig.isDeaggregate()); + } + + private static class OldKinesisIndexTaskIoConfig implements IOConfig + { + private final String baseSequenceName; + private final SeekableStreamEndSequenceNumbers<String, String> startPartitions; + private final SeekableStreamEndSequenceNumbers<String, String> endPartitions; + private final Set<String> exclusiveStartSequenceNumberPartitions; + private final boolean useTransaction; + private final Optional<DateTime> minimumMessageTime; + private final Optional<DateTime> maximumMessageTime; + private final String endpoint; + private final Integer recordsPerFetch; + private final Integer fetchDelayMillis; + + private final String awsAssumedRoleArn; + private final String awsExternalId; + private final boolean deaggregate; + + @JsonCreator + private OldKinesisIndexTaskIoConfig( + @JsonProperty("baseSequenceName") String baseSequenceName, + @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> startPartitions, + @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> endPartitions, + @JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions, + @JsonProperty("useTransaction") Boolean useTransaction, + @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, + @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, + @JsonProperty("endpoint") String endpoint, + @JsonProperty("recordsPerFetch") Integer recordsPerFetch, + @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, + @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, + @JsonProperty("awsExternalId") String awsExternalId, + @JsonProperty("deaggregate") boolean deaggregate + ) + { + this.baseSequenceName = baseSequenceName; + this.startPartitions = startPartitions; + this.endPartitions = endPartitions; + this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions; + this.useTransaction = useTransaction; + this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); + this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); + this.endpoint = endpoint; + this.recordsPerFetch = recordsPerFetch; + this.fetchDelayMillis = fetchDelayMillis; + this.awsAssumedRoleArn = awsAssumedRoleArn; + this.awsExternalId = awsExternalId; + this.deaggregate = deaggregate; + } + + @JsonProperty + public String getBaseSequenceName() + { + return baseSequenceName; + } + + @JsonProperty + public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions() + { + return startPartitions; + } + + @JsonProperty + public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions() + { + return endPartitions; + } + + @JsonProperty + public Set<String> getExclusiveStartSequenceNumberPartitions() + { + return exclusiveStartSequenceNumberPartitions; + } + + @JsonProperty + public boolean isUseTransaction() + { + return useTransaction; + } + + @JsonProperty + public Optional<DateTime> getMinimumMessageTime() + { + return minimumMessageTime; + } + + @JsonProperty + public Optional<DateTime> getMaximumMessageTime() + { + return maximumMessageTime; + } + + @JsonProperty + public String getEndpoint() + { + return endpoint; + } + + @JsonProperty + public int getRecordsPerFetch() + { + return recordsPerFetch; + } + + @JsonProperty + public int getFetchDelayMillis() + { + return fetchDelayMillis; + } + + @JsonProperty + public String getAwsAssumedRoleArn() + { + return awsAssumedRoleArn; + } + + @JsonProperty + public String getAwsExternalId() + { + return awsExternalId; + } + + @JsonProperty + public boolean isDeaggregate() + { + return deaggregate; + } + } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index bbdd2dd..969cc39 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -399,7 +399,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -469,7 +469,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")), @@ -557,7 +557,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, baseSequenceName, startPartitions, endPartitions, @@ -683,7 +683,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, baseSequenceName, startPartitions, endPartitions, @@ -795,7 +795,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -864,7 +864,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -944,7 +944,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport ) ), new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1015,7 +1015,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2")), @@ -1071,7 +1071,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1140,7 +1140,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1209,7 +1209,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), @@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "12")), @@ -1366,7 +1366,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")), @@ -1448,7 +1448,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task1 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1466,7 +1466,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task2 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1540,7 +1540,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task1 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1558,7 +1558,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task2 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 1, "sequence1", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")), @@ -1630,7 +1630,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task1 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1648,7 +1648,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task2 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 1, "sequence1", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")), @@ -1724,7 +1724,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence1", new SeekableStreamStartSequenceNumbers<>( stream, @@ -1808,7 +1808,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task1 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -1826,7 +1826,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task2 = createTask( null, new KinesisIndexTaskIOConfig( - null, + 1, "sequence1", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")), @@ -1901,7 +1901,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task1 = createTask( "task1", new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), @@ -1950,7 +1950,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task2 = createTask( task1.getId(), new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), @@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task1 = createTask( "task1", new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")), @@ -2095,7 +2095,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task2 = createTask( task1.getId(), new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")), @@ -2160,7 +2160,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTask task = createTask( "task1", new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "13")), @@ -2283,7 +2283,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport "task1", DATA_SCHEMA, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")), @@ -2380,7 +2380,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport ImmutableMap.of(shardId1, "100") // simulating unlimited ); final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig( - null, + 0, baseSequenceName, startPartitions, endPartitions, @@ -2493,7 +2493,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport "task1", DATA_SCHEMA, new KinesisIndexTaskIOConfig( - null, + 0, "sequence0", new SeekableStreamStartSequenceNumbers<>( stream, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 73c1d46..4b138f6 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3550,7 +3550,7 @@ public class KinesisSupervisorTest extends EasyMockSupport getDataSchema(dataSource), tuningConfig, new KinesisIndexTaskIOConfig( - null, + 0, "sequenceName-" + taskGroupId, startPartitions, endPartitions, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org