0AyanamiRei commented on code in PR #61325: URL: https://github.com/apache/doris/pull/61325#discussion_r3041769478
########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisDataSourceProperties.java: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload.kinesis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.LoadDataSourceType; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; + +/** + * AWS Kinesis data source properties for Routine Load. + * + * Parameters: + * - kinesis_stream: Name of the Kinesis stream (required) + * - kinesis_shards: Comma-separated list of shard IDs (optional) + * - kinesis_shards_pos: Comma-separated list of positions for each shard (optional) + * - aws.region: AWS region (required) + * - aws.access_key: AWS access key (optional) + * - aws.secret_key: AWS secret key (optional) + * - aws.session_key: AWS session token (optional) + * - aws.role_arn: IAM role ARN (optional) + * - property.kinesis_default_pos: Default position for new shards (optional) + * - property.*: Other pass-through parameters for AWS SDK configuration + * + * Example usage in SQL: + * CREATE ROUTINE LOAD my_job ON my_table + * FROM KINESIS ( + * "aws.region" = "us-east-1", + * "aws.access_key" = "AKIAIOSFODNN7EXAMPLE", + * "aws.secret_key" = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + * "kinesis_stream" = "my-stream", + * "property.kinesis_default_pos" = "TRIM_HORIZON" + * ); + */ +public class KinesisDataSourceProperties extends AbstractDataSourceProperties { + + /** + * List of shard IDs with their starting sequence numbers. + * Pair<ShardId, SequenceNumber> + * SequenceNumber can be: + * - Actual sequence number string + * - TRIM_HORIZON_VAL (-2) for oldest record + * - LATEST_VAL (-1) for newest record + * - Timestamp value for AT_TIMESTAMP + */ + @Getter + @Setter + @SerializedName(value = "kinesisShardPositions") + private List<Pair<String, String>> kinesisShardPositions = Lists.newArrayList(); + + /** + * Custom Kinesis properties for advanced configuration. + * Includes AWS credentials and client configuration. + */ + @Getter + @SerializedName(value = "customKinesisProperties") + private Map<String, String> customKinesisProperties; + + /** + * Whether positions are specified as timestamps. + */ + @Getter + @SerializedName(value = "isPositionsForTimes") + private boolean isPositionsForTimes = false; + + /** + * AWS region for the Kinesis stream. + */ + @Getter + @SerializedName(value = "region") + private String region; + + /** + * Name of the Kinesis stream. + */ + @Getter + @SerializedName(value = "stream") + private String stream; + + /** + * Optional endpoint URL for custom endpoints. + */ + @Getter + @SerializedName(value = "endpoint") + private String endpoint; + + // Standard position constants (similar to Kafka's OFFSET_BEGINNING/OFFSET_END) + public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON"; + public static final String POSITION_LATEST = "LATEST"; + public static final String POSITION_AT_TIMESTAMP = "AT_TIMESTAMP"; + + // Configurable data source properties that can be set by user + private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = + new ImmutableSet.Builder<String>() + .add(KinesisConfiguration.KINESIS_REGION.getName()) + .add(KinesisConfiguration.KINESIS_STREAM.getName()) + .add(KinesisConfiguration.KINESIS_SHARDS.getName()) + .add(KinesisConfiguration.KINESIS_POSITIONS.getName()) + .add(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName()) + .add(KinesisConfiguration.KINESIS_ACCESS_KEY.getName()) + .add(KinesisConfiguration.KINESIS_SECRET_KEY.getName()) + .add(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName()) + .add(KinesisConfiguration.KINESIS_ROLE_ARN.getName()) + .build(); + + public KinesisDataSourceProperties(Map<String, String> dataSourceProperties, boolean multiLoad) { + super(dataSourceProperties, multiLoad); + } + + public KinesisDataSourceProperties(Map<String, String> originalDataSourceProperties) { + super(originalDataSourceProperties); + } + + @Override + protected String getDataSourceType() { + return LoadDataSourceType.KINESIS.name(); + } + + @Override + protected List<String> getRequiredProperties() { + return Arrays.asList( + KinesisConfiguration.KINESIS_REGION.getName(), + KinesisConfiguration.KINESIS_STREAM.getName() + ); + } + + @Override + public void convertAndCheckDataSourceProperties() throws UserException { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
