This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e136f2d245 [INLONG-9187][Agent] Delete useless memory manager (#9188)
e136f2d245 is described below
commit e136f2d245875be72a2c1a07ecddb2e712160f1d
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Nov 1 14:05:53 2023 +0800
[INLONG-9187][Agent] Delete useless memory manager (#9188)
---
.../inlong/agent/core/task/MemoryManager.java | 115 -----------
.../inlong/agent/plugin/sinks/ProxySink.java | 216 ---------------------
.../inlong/agent/plugin/sinks/SenderManager.java | 10 -
3 files changed, 341 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
deleted file mode 100644
index d67a15fb5a..0000000000
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.conf.AgentConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
-
-/**
- * used to limit global memory to avoid oom
- */
-public class MemoryManager {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(MemoryManager.class);
- private static volatile MemoryManager memoryManager = null;
- private final AgentConfiguration conf;
- private ConcurrentHashMap<String, Semaphore> semaphoreMap = new
ConcurrentHashMap<>();
-
- private MemoryManager() {
- this.conf = AgentConfiguration.getAgentConf();
- Semaphore semaphore = null;
- semaphore = new Semaphore(
- conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
- semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
-
- semaphore = new Semaphore(
- conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT,
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
- semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
-
- semaphore = new Semaphore(
- conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT,
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT));
- semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore);
-
- semaphore = new Semaphore(
- conf.getInt(AGENT_GLOBAL_WRITER_PERMIT,
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
- semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
- }
-
- /**
- * manager singleton
- */
- public static MemoryManager getInstance() {
- if (memoryManager == null) {
- synchronized (MemoryManager.class) {
- if (memoryManager == null) {
- memoryManager = new MemoryManager();
- }
- }
- }
- return memoryManager;
- }
-
- public boolean tryAcquire(String semaphoreName, int permit) {
- Semaphore semaphore = semaphoreMap.get(semaphoreName);
- if (semaphore == null) {
- LOGGER.error("tryAcquire {} not exist");
- return false;
- }
- return semaphore.tryAcquire(permit);
- }
-
- public void release(String semaphoreName, int permit) {
- Semaphore semaphore = semaphoreMap.get(semaphoreName);
- if (semaphore == null) {
- LOGGER.error("release {} not exist");
- return;
- }
- semaphore.release(permit);
- }
-
- public void printDetail(String semaphoreName) {
- Semaphore semaphore = semaphoreMap.get(semaphoreName);
- if (semaphore == null) {
- LOGGER.error("printDetail {} not exist");
- return;
- }
- LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(),
semaphore.getQueueLength(),
- semaphoreName);
- }
-
- public void printAll() {
- printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
- printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
- printDetail(AGENT_GLOBAL_CHANNEL_PERMIT);
- printDetail(AGENT_GLOBAL_WRITER_PERMIT);
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
deleted file mode 100755
index c3a5bbfc07..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sinks;
-
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.MemoryManager;
-import org.apache.inlong.agent.message.BatchProxyMessage;
-import org.apache.inlong.agent.message.EndMessage;
-import org.apache.inlong.agent.message.PackProxyMessage;
-import org.apache.inlong.agent.message.ProxyMessage;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-
-/**
- * sink message data to inlong-dataproxy
- */
-public class ProxySink extends AbstractSink {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProxySink.class);
- private static AtomicLong index = new AtomicLong(0);
- private final ExecutorService executorService = new ThreadPoolExecutor(1,
1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(), new AgentThreadFactory("ProxySink"));
- private MessageFilter messageFilter;
- private SenderManager senderManager;
- private byte[] fieldSplitter;
- private volatile boolean shutdown = false;
- private int maxPackSize;
-
- public ProxySink() {
- }
-
- @Override
- public void write(Message message) {
- if (message == null) {
- return;
- }
- boolean suc = false;
- while (!suc) {
- suc = putInCache(message);
- if (!suc) {
- AgentUtils.silenceSleepInMs(batchFlushInterval);
- }
- }
- }
-
- private boolean putInCache(Message message) {
- try {
- if (message == null) {
- return true;
- }
- message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongGroupId);
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
- extractStreamFromMessage(message, fieldSplitter);
- if (message instanceof EndMessage) {
- // increment the count of failed sinks
- sinkMetric.sinkFailCount.incrementAndGet();
- return true;
- }
- AtomicBoolean suc = new AtomicBoolean(false);
- ProxyMessage proxyMessage = new ProxyMessage(message);
- boolean writerPermitSuc = MemoryManager.getInstance()
- .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT,
message.getBody().length);
- if (!writerPermitSuc) {
- LOGGER.warn("writer tryAcquire failed");
-
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
- return false;
- }
- // add proxy message to cache.
- cache.compute(proxyMessage.getBatchKey(),
- (s, packProxyMessage) -> {
- if (packProxyMessage == null) {
- packProxyMessage = new
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId,
- proxyMessage.getInlongStreamId());
-
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
- }
- // add message to package proxy
-
suc.set(packProxyMessage.addProxyMessage(proxyMessage));
- return packProxyMessage;
- });
- if (suc.get()) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT,
message.getBody().length);
- // increment the count of successful sinks
- sinkMetric.sinkSuccessCount.incrementAndGet();
- } else {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT,
message.getBody().length);
- // increment the count of failed sinks
- sinkMetric.sinkFailCount.incrementAndGet();
- }
- return suc.get();
- } catch (Exception e) {
- LOGGER.error("write message to Proxy sink error", e);
- } catch (Throwable t) {
- ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
- }
- return false;
- }
-
- /**
- * extract stream id from message if message filter is presented
- */
- private void extractStreamFromMessage(Message message, byte[]
fieldSplitter) {
- if (messageFilter != null) {
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
- messageFilter.filterStreamId(message, fieldSplitter));
- } else {
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
- }
- }
-
- /**
- * flush cache by batch
- *
- * @return thread runner
- */
- private Runnable flushCache() {
- return () -> {
- LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
- while (!shutdown) {
- try {
- cache.forEach((batchKey, packProxyMessage) -> {
- BatchProxyMessage batchProxyMessage =
packProxyMessage.fetchBatch();
- if (batchProxyMessage != null) {
- senderManager.sendBatch(batchProxyMessage);
- LOGGER.info("send group id {}, message key {},with
message size {}, the job id is {}, "
- + "read source is {} sendTime is {}",
inlongGroupId, batchKey,
- batchProxyMessage.getDataList().size(),
jobInstanceId, sourceName,
- batchProxyMessage.getDataTime());
- }
- });
- } catch (Exception ex) {
- LOGGER.error("error caught", ex);
- } catch (Throwable t) {
- ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
- } finally {
- AgentUtils.silenceSleepInMs(batchFlushInterval);
- }
- }
- LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
- };
- }
-
- @Override
- public void init(JobProfile jobConf) {
- super.init(jobConf);
- this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
- messageFilter = initMessageFilter(jobConf);
- fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER,
DEFAULT_FIELD_SPLITTER).getBytes(
- StandardCharsets.UTF_8);
- senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
- try {
- senderManager.Start();
- executorService.execute(flushCache());
- } catch (Throwable ex) {
- LOGGER.error("error while init sender for group id {}",
inlongGroupId);
- ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
- throw new IllegalStateException(ex);
- }
- }
-
- @Override
- public void destroy() {
- LOGGER.info("destroy sink source name {}", sourceName);
- while (!sinkFinish()) {
- LOGGER.info("sourceName {} wait until cache all flushed to proxy",
sourceName);
- AgentUtils.silenceSleepInMs(batchFlushInterval);
- }
- shutdown = true;
- executorService.shutdown();
- senderManager.Stop();
- LOGGER.info("destroy sink source name {} end", sourceName);
- }
-
- /**
- * check whether all stream id messages finished
- */
- private boolean sinkFinish() {
- return cache.values().stream().allMatch(PackProxyMessage::isEmpty);
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 554d6f5916..6b05f23fb7 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -21,12 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -53,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -175,10 +172,6 @@ public class SenderManager {
while (!resendQueue.isEmpty()) {
try {
AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
- if (callback != null) {
- MemoryManager.getInstance()
- .release(AGENT_GLOBAL_WRITER_PERMIT, (int)
callback.batchMessage.getTotalSize());
- }
} catch (InterruptedException e) {
LOGGER.error("clean resend queue error{}", e.getMessage());
}
@@ -324,9 +317,6 @@ public class SenderManager {
String jobId = batchMessage.getJobId();
long dataTime = batchMessage.getDataTime();
if (result != null && result.equals(SendResult.OK)) {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int)
batchMessage.getTotalSize());
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, msgCnt,
- batchMessage.getTotalSize());
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
PositionManager.getInstance()
.updateSinkPosition(batchMessage.getJobId(),
sourcePath, msgCnt, false);