This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch datacarrier-bug in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 09c0e8e32d643eb8a4189225925389ab0e483252 Author: Wu Sheng <[email protected]> AuthorDate: Mon Dec 21 22:08:10 2020 +0800 Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode. --- CHANGES.md | 1 + .../apm/commons/datacarrier/DataCarrier.java | 18 +++++++++--------- .../apm/commons/datacarrier/DataCarrierTest.java | 8 +++----- .../agent/core/remote/TraceSegmentServiceClient.java | 3 +-- .../core/kafka/KafkaTraceSegmentServiceClient.java | 3 +-- 5 files changed, 15 insertions(+), 18 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 77c01f7..43731e9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,7 @@ Release Notes. * Update `byte-buddy` to 1.10.19. * Fix thrift plugin trace link broken when intermediate service does not mount agent * Fix thrift plugin collects wrong args when the method without parameter. +* Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode. #### OAP-Backend * Make meter receiver support MAL. diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java index 3f34846..66d3ff8 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java @@ -44,10 +44,18 @@ public class DataCarrier<T> { } public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) { + this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING); + } + + public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) { this.name = name; bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize); channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize); - channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING); + channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy); + } + + public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) { + this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy); } /** @@ -63,14 +71,6 @@ public class DataCarrier<T> { } /** - * override the strategy at runtime. Notice, {@link Channels} will override several channels one by one. - */ - public DataCarrier setBufferStrategy(BufferStrategy strategy) { - this.channels.setStrategy(strategy); - return this; - } - - /** * produce data to buffer, using the given {@link BufferStrategy}. * * @return false means produce data failure. The data will not be consumed. diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java index a67475b..ae496a0 100644 --- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java +++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java @@ -33,7 +33,7 @@ import org.powermock.api.support.membermodification.MemberModifier; public class DataCarrierTest { @Test public void testCreateDataCarrier() throws IllegalAccessException { - DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100); + DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100, BufferStrategy.IF_POSSIBLE); Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels") .get(carrier)); @@ -42,8 +42,7 @@ public class DataCarrierTest { QueueBuffer<SampleData> buffer = channels.getBuffer(0); Assert.assertEquals(100, buffer.getBufferSize()); - Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING); - carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE); + Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE); Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy") .get(buffer), BufferStrategy.IF_POSSIBLE); @@ -81,8 +80,7 @@ public class DataCarrierTest { @Test public void testIfPossibleProduce() throws IllegalAccessException { - DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100); - carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE); + DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100, BufferStrategy.IF_POSSIBLE); for (int i = 0; i < 200; i++) { Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i))); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java index 7ebd366..c9583cc 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java @@ -64,8 +64,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe lastLogTime = System.currentTimeMillis(); segmentUplinkedCounter = 0; segmentAbandonedCounter = 0; - carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE); - carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE); + carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); carrier.consume(this, 1); } diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java index 36572a8..c10ebd6 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java @@ -59,8 +59,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr @Override public void boot() { - carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE); - carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE); + carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); carrier.consume(this, 1); producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
