justinwwhuang commented on code in PR #9749:
URL: https://github.com/apache/inlong/pull/9749#discussion_r1507074093
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java:
##########
@@ -17,140 +17,311 @@
package org.apache.inlong.agent.plugin.sources;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
-
-import com.google.gson.Gson;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_LINE_FILTER;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_BOOTSTRAP_SERVERS;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_GROUP_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_OFFSET;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_TOPIC;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_LINE_FILTER_PATTERN;
import static
org.apache.inlong.agent.constant.TaskConstants.JOB_OFFSET_DELIMITER;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_ID;
+import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
/**
* kafka source, split kafka source job into multi readers
*/
public class KafkaSource extends AbstractSource {
- public static final String JOB_KAFKA_AUTO_RESETE = "auto.offset.reset";
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private class SourceData {
+
+ private byte[] data;
+ private Long offset;
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSource.class);
- private static final String JOB_KAFKAJOB_PARAM_PREFIX = "job.kafkaJob.";
- private static final String JOB_KAFKAJOB_WAIT_TIMEOUT =
"job.kafkajob.wait.timeout";
- private static final String KAFKA_COMMIT_AUTO = "enable.auto.commit";
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("kafka-source"));
+ private BlockingQueue<KafkaSource.SourceData> queue;
+ public InstanceProfile profile;
+ private int maxPackSize;
+ private String taskId;
+ private String instanceId;
+ private String topic;
+ private Properties props = new Properties();
+ private String allPartitionOffsets;
+ Map<Integer, Long> partitionOffsets = new HashMap<>();
+ private volatile boolean running = false;
+ private volatile boolean runnable = true;
+ private volatile AtomicLong emptyCount = new AtomicLong(0);
+
+ private final Integer CACHE_QUEUE_SIZE = 100000;
+ private final Integer READ_WAIT_TIMEOUT_MS = 10;
+ private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+
private static final String KAFKA_DESERIALIZER_METHOD =
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
- private static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";
- private static final String KAFKA_VALUE_DESERIALIZER =
"value.deserializer";
private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
- private static final Gson gson = new Gson();
- private static AtomicLong metricsIndex = new AtomicLong(0);
+ private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+ private boolean isRealTime = false;
+ private boolean isRestoreFromDB = false;
public KafkaSource() {
}
@Override
- public List<Reader> split(TaskProfile conf) {
- List<Reader> result = new ArrayList<>();
- String filterPattern = conf.get(JOB_LINE_FILTER_PATTERN,
DEFAULT_JOB_LINE_FILTER);
-
- Properties props = new Properties();
- Map<String, String> map = gson.fromJson(conf.toJsonStr(), Map.class);
-
props.put(JOB_KAFKA_BOOTSTRAP_SERVERS.replace(JOB_KAFKAJOB_PARAM_PREFIX,
StringUtils.EMPTY),
- map.get(JOB_KAFKA_BOOTSTRAP_SERVERS));
-
- props.put(KAFKA_KEY_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
- props.put(KAFKA_VALUE_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
- // set offset
- props.put(KAFKA_COMMIT_AUTO, false);
- if
(ObjectUtils.isNotEmpty(map.get(JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET))) {
- props.put(JOB_KAFKA_AUTO_RESETE,
map.get(JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET));
- }
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(conf.get(JOB_KAFKA_TOPIC));
- String allPartitionOffsets = map.get(JOB_KAFKA_OFFSET);
- Long offset = null;
- String[] partitionOffsets = null;
- if (StringUtils.isNotBlank(allPartitionOffsets)) {
- // example:0#110_1#666_2#222
- partitionOffsets = allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
+ public void init(InstanceProfile profile) {
+ try {
+ LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
+ this.profile = profile;
+ super.init(profile);
+ String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+ if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ cycleUnit = CycleUnitType.HOUR;
+ }
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+ taskId = profile.getTaskId();
+ instanceId = profile.getInstanceId();
+ topic = profile.getInstanceId();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
KAFKA_DESERIALIZER_METHOD);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KAFKA_DESERIALIZER_METHOD);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
profile.get(TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET));
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ allPartitionOffsets = profile.get(TASK_KAFKA_OFFSET);
+ isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
+ if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
+ // example:0#110_1#666_2#222
+ String[] offsets =
allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
+ for (String offset : offsets) {
+
partitionOffsets.put(Integer.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]),
+
Long.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
+ }
+ }
+
+ EXECUTOR_SERVICE.execute(run());
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + topic, ex);
}
- // set consumer session timeout
- props.put(KAFKA_SESSION_TIMEOUT, 30000);
- // spilt reader reduce to partition
- if (null != partitionInfoList) {
- for (PartitionInfo partitionInfo : partitionInfoList) {
-
props.put(JOB_KAFKA_GROUP_ID.replace(JOB_KAFKAJOB_PARAM_PREFIX,
StringUtils.EMPTY),
- map.getOrDefault(JOB_KAFKA_GROUP_ID,
- map.get(TASK_ID) + JOB_OFFSET_DELIMITER
- + "group" +
partitionInfo.partition()));
- KafkaConsumer<String, byte[]> partitonConsumer = new
KafkaConsumer<>(props);
- partitonConsumer.assign(Collections.singletonList(
- new TopicPartition(partitionInfo.topic(),
partitionInfo.partition())));
- // if get offset,consume from offset; if not,consume from 0
- if (partitionOffsets != null && partitionOffsets.length > 0) {
- for (String partitionOffset : partitionOffsets) {
- if
(partitionOffset.contains(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)
- &&
partitionOffset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]
-
.equals(String.valueOf(partitionInfo.partition()))) {
- offset =
Long.valueOf(partitionOffset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]);
+ }
+
+ private Runnable run() {
+ return () -> {
+ AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" +
instanceId);
+ running = true;
+ try {
+ List<PartitionInfo> partitionInfoList;
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props)) {
+ partitionInfoList = consumer.partitionsFor(topic);
+ }
+
+ props.put(KAFKA_SESSION_TIMEOUT, 30000);
+
+ try (KafkaConsumer<String, byte[]> kafkaConsumer = new
KafkaConsumer<>(props)) {
+ if (null != partitionInfoList) {
+ List<TopicPartition> topicPartitions = new
ArrayList<>();
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ TopicPartition topicPartition = new
TopicPartition(partitionInfo.topic(),
+ partitionInfo.partition());
+ topicPartitions.add(topicPartition);
+ }
+ kafkaConsumer.assign(topicPartitions);
+
+ if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
+ for (TopicPartition topicPartition :
topicPartitions) {
+ Long offset =
partitionOffsets.get(topicPartition.partition());
+ if (ObjectUtils.isNotEmpty(offset)) {
+ kafkaConsumer.seek(topicPartition, offset);
+ }
+ }
+ } else {
+ LOGGER.info("Skip to seek offset");
}
}
+ doRun(kafkaConsumer);
}
- LOGGER.info("kafka topic partition offset:{}", offset);
- if (offset != null) {
- // if offset not null,then consume from the offset
- partitonConsumer.seek(new
TopicPartition(partitionInfo.topic(), partitionInfo.partition()), offset);
+ } catch (Throwable e) {
+ LOGGER.error("do run error maybe topic is configured
incorrectly: ", e);
+ }
+ running = false;
+ };
+ }
+
+ private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
+ long lastPrintTime = 0;
+ while (isRunnable()) {
+ ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
+ if (records.isEmpty()) {
+ if (queue.isEmpty()) {
+ emptyCount.incrementAndGet();
+ } else {
+ emptyCount.set(0);
}
- KafkaReader<String, byte[]> kafkaReader = new
KafkaReader<>(partitonConsumer, map);
- addValidator(filterPattern, kafkaReader);
- result.add(kafkaReader);
+ AgentUtils.silenceSleepInSeconds(1);
+ continue;
+ }
+ emptyCount.set(0);
+ long offset = 0L;
+ for (ConsumerRecord<String, byte[]> record : records) {
+ LOGGER.info("record: {}", record.value());
+ SourceData sourceData = new SourceData(record.value(),
record.offset());
+ putIntoQueue(sourceData);
Review Comment:
Before being placed in the queue, a semaphore needs to be applied for
through memoryManager
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]