http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java new file mode 100644 index 0000000..219ee6e --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.Arrays; +import java.util.Calendar; +import java.util.LinkedList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; + +public class StatsItem { + private final AtomicLong value = new AtomicLong(0); + private final AtomicLong times = new AtomicLong(0); + private final AtomicLong[] valueIncDistributeRegion = new AtomicLong[10]; + private final AtomicLong valueMaxInMinutes = new AtomicLong(0); + private final AtomicLong valueMaxIn10Minutes = new AtomicLong(0); + private final AtomicLong valueMaxInHour = new AtomicLong(0); + + private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>(); + private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>(); + private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>(); + + private final ScheduledExecutorService scheduledExecutorService; + private final String statsName; + private final String statsKey; + private final Logger log; + + public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.statsKey = statsKey; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + + for (int i = 0; i < this.valueIncDistributeRegion.length; i++) { + valueIncDistributeRegion[i] = new AtomicLong(0); + } + } + + public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) { + long prev = target.get(); + while (value > prev) { + boolean updated = target.compareAndSet(prev, value); + if (updated) + return true; + + prev = target.get(); + } + + return false; + } + + private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) { + StatsSnapshot statsSnapshot = new StatsSnapshot(); + synchronized (csList) { + double tps = 0; + double avgpt = 0; + long sum = 0; + if (!csList.isEmpty()) { + CallSnapshot first = csList.getFirst(); + CallSnapshot last = csList.getLast(); + sum = last.getValue() - first.getValue(); + tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); + + long timesDiff = last.getTimes() - first.getTimes(); + if (timesDiff > 0) { + avgpt = (sum * 1.0d) / timesDiff; + } + + } + + statsSnapshot.setSum(sum); + statsSnapshot.setTps(tps); + statsSnapshot.setAvgpt(avgpt); + } + + return statsSnapshot; + } + + public static long computNextMinutesTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 0); + cal.add(Calendar.MINUTE, 1); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + public static long computNextHourTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 0); + cal.add(Calendar.HOUR_OF_DAY, 1); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + public static long computNextMorningTimeMillis() { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTimeInMillis(); + } + + public void addValue(final int incValue, final int incTimes) { + this.value.addAndGet(incValue); + this.times.addAndGet(incTimes); + this.setValueIncDistributeRegion(incValue); + StatsItem.compareAndIncreaseOnly(this.valueMaxInMinutes, incValue); + StatsItem.compareAndIncreaseOnly(this.valueMaxIn10Minutes, incValue); + StatsItem.compareAndIncreaseOnly(this.valueMaxInHour, incValue); + } + + public StatsSnapshot getStatsDataInMinute() { + return computeStatsData(this.csListMinute); + } + + public StatsSnapshot getStatsDataInHour() { + return computeStatsData(this.csListHour); + } + + public StatsSnapshot getStatsDataInDay() { + return computeStatsData(this.csListDay); + } + + public void init() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInSeconds(); + } catch (Throwable ignored) { + } + } + }, 0, 10, TimeUnit.SECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInMinutes(); + } catch (Throwable ignored) { + } + + } + }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), // + 1000 * 60 * 10, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInHour(); + } catch (Throwable ignored) { + } + } + }, 0, 1, TimeUnit.HOURS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } catch (Throwable ignored) { + } + } + }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), // + 1000 * 60, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtHour(); + } catch (Throwable ignored) { + } + + } + }, Math.abs(StatsItem.computNextHourTimeMillis() - System.currentTimeMillis()), // + 1000 * 60 * 60, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtDay(); + } catch (Throwable ignored) { + } + } + }, Math.abs(StatsItem.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, // + 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); + } + + public void samplingInSeconds() { + synchronized (this.csListMinute) { + this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get())); + if (this.csListMinute.size() > 7) { + this.csListMinute.removeFirst(); + } + } + } + + public void samplingInMinutes() { + synchronized (this.csListHour) { + this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get())); + if (this.csListHour.size() > 7) { + this.csListHour.removeFirst(); + } + } + + valueMaxIn10Minutes.set(0); + } + + public void samplingInHour() { + synchronized (this.csListDay) { + this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get())); + if (this.csListDay.size() > 25) { + this.csListDay.removeFirst(); + } + } + } + + public void printAtMinutes() { + StatsSnapshot ss = computeStatsData(this.csListMinute); + log.info(String + .format( + "[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s", + this.statsName, + this.statsKey, + ss.getSum(), + ss.getTps(), + ss.getAvgpt(), + this.valueMaxInMinutes.get(), + this.valueMaxIn10Minutes.get(), + this.valueMaxInHour.get(), + Arrays.toString(valueRegion()) + )); + + valueMaxInMinutes.set(0); + } + + public void printAtHour() { + StatsSnapshot ss = computeStatsData(this.csListHour); + log.info(String + .format( + "[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s", + this.statsName, + this.statsKey, + ss.getSum(), + ss.getTps(), + ss.getAvgpt(), + this.valueMaxInMinutes.get(), + this.valueMaxIn10Minutes.get(), + this.valueMaxInHour.get(), + Arrays.toString(valueRegion()) + )); + + valueMaxInHour.set(0); + } + + public void printAtDay() { + StatsSnapshot ss = computeStatsData(this.csListDay); + log.info(String.format( + "[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s", + this.statsName, + this.statsKey, + ss.getSum(), + ss.getTps(), + ss.getAvgpt(), + this.valueMaxInMinutes.get(), + this.valueMaxIn10Minutes.get(), + this.valueMaxInHour.get(), + Arrays.toString(valueRegion()) + )); + } + + long[] valueRegion() { + long[] vrs = new long[this.valueIncDistributeRegion.length]; + for (int i = 0; i < this.valueIncDistributeRegion.length; i++) { + vrs[i] = this.valueIncDistributeRegion[i].get(); + } + return vrs; + } + + public AtomicLong getValue() { + return value; + } + + public String getStatsName() { + return statsName; + } + + public AtomicLong getTimes() { + return times; + } + + public AtomicLong[] getValueDistributeRegion() { + return valueIncDistributeRegion; + } + + public AtomicLong[] getValueIncDistributeRegion() { + return valueIncDistributeRegion; + } + + private void setValueIncDistributeRegion(long value) { + // < 1ms + if (value <= 0) { + this.valueIncDistributeRegion[0].incrementAndGet(); + } + // 1ms ~ 10ms + else if (value < 10) { + this.valueIncDistributeRegion[1].incrementAndGet(); + } + // 10ms ~ 100ms + else if (value < 100) { + this.valueIncDistributeRegion[2].incrementAndGet(); + } + // 100ms ~ 500ms + else if (value < 500) { + this.valueIncDistributeRegion[3].incrementAndGet(); + } + // 500ms ~ 1s + else if (value < 1000) { + this.valueIncDistributeRegion[4].incrementAndGet(); + } + // 1s ~ 3s + else if (value < 3000) { + this.valueIncDistributeRegion[5].incrementAndGet(); + } + // 3s ~ 5s + else if (value < 5000) { + this.valueIncDistributeRegion[6].incrementAndGet(); + } + // 5s ~ 10s + else if (value < 10000) { + this.valueIncDistributeRegion[7].incrementAndGet(); + } + // 10s ~ 30s + else if (value < 30000) { + this.valueIncDistributeRegion[8].incrementAndGet(); + } + // >= 30s + else { + this.valueIncDistributeRegion[9].incrementAndGet(); + } + } + + public AtomicLong getValueMaxInHour() { + return valueMaxInHour; + } + + public AtomicLong getValueMaxInMinutes() { + return valueMaxInMinutes; + } + + public AtomicLong getValueMaxIn10Minutes() { + return valueMaxIn10Minutes; + } + + public static class StatsSnapshot { + private long sum; + private double tps; + private double avgpt; + + public long getSum() { + return sum; + } + + public void setSum(long sum) { + this.sum = sum; + } + + public double getTps() { + return tps; + } + + public void setTps(double tps) { + this.tps = tps; + } + + public double getAvgpt() { + return avgpt; + } + + public void setAvgpt(double avgpt) { + this.avgpt = avgpt; + } + } + + class CallSnapshot { + private final long timestamp; + private final long times; + private final long value; + + public CallSnapshot(long timestamp, long times, long value) { + super(); + this.timestamp = timestamp; + this.times = times; + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public long getTimes() { + return times; + } + + public long getValue() { + return value; + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java new file mode 100644 index 0000000..79c1520 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; + +public class StatsItemSet { + private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = new ConcurrentHashMap<String, StatsItem>(128); + private final String statsName; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + } + + public void init() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInSeconds(); + } catch (Throwable ignored) { + } + } + }, 0, 10, TimeUnit.SECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInMinutes(); + } catch (Throwable ignored) { + } + } + }, 0, 10, TimeUnit.MINUTES); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + samplingInHour(); + } catch (Throwable ignored) { + } + } + }, 0, 1, TimeUnit.HOURS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } catch (Throwable ignored) { + } + } + }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), // + 1000 * 60, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtHour(); + } catch (Throwable ignored) { + } + } + }, Math.abs(StatsItem.computNextHourTimeMillis() - System.currentTimeMillis()), // + 1000 * 60 * 60, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtDay(); + } catch (Throwable ignored) { + } + } + }, Math.abs(StatsItem.computNextMorningTimeMillis() - System.currentTimeMillis()), // + 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); + } + + private void samplingInSeconds() { + for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) { + next.getValue().samplingInSeconds(); + } + } + + private void samplingInMinutes() { + for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) { + next.getValue().samplingInMinutes(); + } + } + + private void samplingInHour() { + for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) { + next.getValue().samplingInHour(); + } + } + + private void printAtMinutes() { + for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) { + next.getValue().printAtMinutes(); + } + } + + private void printAtHour() { + for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) { + next.getValue().printAtHour(); + } + } + + private void printAtDay() { + for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) { + next.getValue().printAtDay(); + } + } + + void addValue(final String statsKey, final int incValue, final int incTimes) { + this.getAndCreateStatsItem(statsKey).addValue(incValue, incTimes); + } + + private StatsItem getAndCreateStatsItem(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null == statsItem) { + statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + StatsItem prev = this.statsItemTable.put(statsKey, statsItem); + if (null == prev) { + // statsItem.init(); + } + } + + return statsItem; + } + + public StatsItem.StatsSnapshot getStatsDataInMinute(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + return statsItem.getStatsDataInMinute(); + } + return new StatsItem.StatsSnapshot(); + } + + public StatsItem.StatsSnapshot getStatsDataInHour(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + return statsItem.getStatsDataInHour(); + } + + return new StatsItem.StatsSnapshot(); + } + + public StatsItem.StatsSnapshot getStatsDataInDay(final String statsKey) { + StatsItem statsItem = this.statsItemTable.get(statsKey); + if (null != statsItem) { + return statsItem.getStatsDataInDay(); + } + return new StatsItem.StatsSnapshot(); + } + + public StatsItem getStatsItem(final String statsKey) { + return this.statsItemTable.get(statsKey); + } + + ConcurrentHashMap<String, StatsItem> getStatsItemTable() { + return statsItemTable; + } + + public String getStatsName() { + return statsName; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java new file mode 100644 index 0000000..540779e --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +public class ThreadStats { + private final ConcurrentHashMap<Threading, TimestampRegion> statsTable = new ConcurrentHashMap<Threading, TimestampRegion>(64); + + public void beginInvoke(final long beginTimestamp) { + Threading th = new Threading(Thread.currentThread().getName(), Thread.currentThread().getId()); + + TimestampRegion tr = this.statsTable.get(th); + if (null == tr) { + tr = new TimestampRegion(); + this.statsTable.put(th, tr); + } + + tr.setBeginTimestamp(beginTimestamp); + tr.setEndTimestamp(-1); + } + + public void endInvoke(final long endTimestamp) { + Threading th = new Threading(Thread.currentThread().getName(), Thread.currentThread().getId()); + TimestampRegion tr = this.statsTable.get(th); + tr.setEndTimestamp(endTimestamp); + } + + public TreeMap<Threading, TimestampRegion> cloneStatsTable() { + TreeMap<Threading, TimestampRegion> result = new TreeMap<Threading, TimestampRegion>(); + + for (final Map.Entry<Threading, TimestampRegion> next : this.statsTable.entrySet()) { + result.put(next.getKey(), next.getValue()); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java new file mode 100644 index 0000000..e2c19b5 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +public class Threading implements Comparable { + private String name; + private long id; + + public Threading() { + } + + public Threading(String name, long id) { + this.name = name; + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + @Override + public int hashCode() { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (int) (id ^ (id >>> 32)); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + Threading threading = (Threading) o; + + return id == threading.id && !(name != null ? !name.equals(threading.name) : threading.name != null); + + } + + @Override + public String toString() { + return "Threading{" + + "name='" + name + '\'' + + ", id=" + id + + '}'; + } + + @Override + public int compareTo(Object o) { + Threading t = (Threading) o; + int ret = t.name.compareTo(this.name); + if (ret == 0) { + return Long.valueOf(t.id).compareTo(this.id); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java new file mode 100644 index 0000000..cf8042b --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +public class TimestampRegion { + private volatile long beginTimestamp = -1; + private volatile long endTimestamp = -1; + + public long getBeginTimestamp() { + return beginTimestamp; + } + + public void setBeginTimestamp(long beginTimestamp) { + this.beginTimestamp = beginTimestamp; + } + + public long getEndTimestamp() { + return endTimestamp; + } + + public void setEndTimestamp(long endTimestamp) { + this.endTimestamp = endTimestamp; + } + + @Override + public int hashCode() { + int result = (int) (beginTimestamp ^ (beginTimestamp >>> 32)); + result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32)); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TimestampRegion that = (TimestampRegion) o; + return beginTimestamp == that.beginTimestamp && endTimestamp == that.endTimestamp; + } + + @Override + public String toString() { + return "TimestampRegion{" + + "beginTimestamp=" + beginTimestamp + + ", endTimestamp=" + endTimestamp + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java new file mode 100644 index 0000000..376ccda --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.Map; + +public class UtilAll { + + public static String jstack() { + return jstack(Thread.getAllStackTraces()); + } + + private static String jstack(Map<Thread, StackTraceElement[]> map) { + StringBuilder result = new StringBuilder(); + try { + for (final Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) { + StackTraceElement[] elements = entry.getValue(); + Thread thread = entry.getKey(); + if (elements != null && elements.length > 0) { + String threadName = entry.getKey().getName(); + result.append(String.format("%-40s TID: %d STATE: %s\n", threadName, thread.getId(), thread.getState())); + for (StackTraceElement el : elements) { + result.append(String.format("%-40s %s\n", threadName, el.toString())); + } + result.append("\n"); + } + } + } catch (Throwable ignored) { + } + + return result.toString(); + } + + public static String jstack(final String threadName, final long threadId) { + Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); + StringBuilder result = new StringBuilder(); + try { + for (final Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) { + StackTraceElement[] elements = entry.getValue(); + Thread thread = entry.getKey(); + if (elements != null && elements.length > 0) { + if (threadName.equals(entry.getKey().getName()) && threadId == entry.getKey().getId()) { + result.append(String.format("%-40s TID: %d STATE: %s\n", threadName, thread.getId(), thread.getState())); + for (StackTraceElement el : elements) { + result.append(String.format("%-40s %s\n", threadName, el.toString())); + } + } + } + } + } catch (Throwable ignored) { + } + return result.toString(); + } + + public static ExecuteResult callShellCommand(final String shellString) { + Process process = null; + try { + String[] cmdArray = shellString.split(" "); + process = Runtime.getRuntime().exec(cmdArray); + process.waitFor(); + } catch (Throwable ignored) { + } finally { + if (null != process) + process.destroy(); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java new file mode 100644 index 0000000..e7330b8 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.processor; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import org.apache.rocketmq.remoting.api.RequestProcessor; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.serializable.Serializer; +import org.apache.rocketmq.rpc.annotation.RemoteService; +import org.apache.rocketmq.rpc.impl.command.ResponseCode; +import org.apache.rocketmq.rpc.impl.context.RpcProviderContext; +import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionInvokeMessage; +import org.apache.rocketmq.rpc.impl.metrics.ServiceStats; +import org.apache.rocketmq.rpc.impl.service.RpcEntry; +import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract; +import org.apache.rocketmq.rpc.impl.service.RpcServiceCallBody; +import org.apache.rocketmq.rpc.internal.ServiceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RpcRequestProcessor implements RequestProcessor { + private static final Logger log = LoggerFactory.getLogger(RpcRequestProcessor.class); + private Map<String, RpcEntry> serviceTable = new ConcurrentHashMap<>(64); + private Map<String, ExecutorService> executorTable = new ConcurrentHashMap<>(64); + private ThreadLocal<RpcProviderContext> threadLocalProviderContext; + private RpcInstanceAbstract rpcInstanceAbstract; + private ServiceStats serviceStats; + + public RpcRequestProcessor(ThreadLocal<RpcProviderContext> threadLocalProviderContext, + final RpcInstanceAbstract rpcInstanceAbstract, ServiceStats stats) { + this.threadLocalProviderContext = threadLocalProviderContext; + this.rpcInstanceAbstract = rpcInstanceAbstract; + this.serviceStats = stats; + } + + public Set<String> putNewService(final Object obj) { + return putNewService(obj, null); + } + + public Set<String> putNewService(final Object obj, final ExecutorService executorService) { + Class<?>[] interfaces = obj.getClass().getInterfaces(); + for (Class<?> itf : interfaces) { + RemoteService serviceExport = itf.getAnnotation(RemoteService.class); + if (null == serviceExport) { + log.warn("Service:{} is not remark annotation", itf.getName()); + continue; + } + + Method[] methods = itf.getMethods(); + for (Method method : methods) { + if (!ServiceUtil.testServiceExportMethod(method)) { + log.error("The method: [{}] not matched RPC standard", method.toGenericString()); + continue; + } + + String requestCode = ServiceUtil.toRequestCode(serviceExport, method); + RpcEntry se = new RpcEntry(); + se.setServiceExport(serviceExport); + se.setObject(obj); + se.setMethod(method); + this.serviceTable.put(requestCode, se); + if (executorService != null) { + this.executorTable.put(requestCode, executorService); + } + } + } + return this.serviceTable.keySet(); + } + + @Override + public RemotingCommand processRequest(RemotingChannel channel, RemotingCommand request) { + RpcProviderContext rpcProviderContext = new RpcProviderContext(); + rpcProviderContext.setRemotingChannel(channel); + rpcProviderContext.setRemotingRequest(request); + rpcProviderContext.setRemotingResponse(rpcInstanceAbstract.remotingService().commandFactory().createResponse(request)); + rpcProviderContext.setReturnResponse(true); + threadLocalProviderContext.set(rpcProviderContext); + this.processRequest0(rpcProviderContext, request); + threadLocalProviderContext.remove(); + + if (rpcProviderContext.isReturnResponse()) { + return rpcProviderContext.getRemotingResponse(); + } + return null; + } + + private Object[] buildParameter(final RpcProviderContext context, final RemotingCommand request, + RpcServiceCallBody serviceCallBody, Type[] parameterTypes) { + Object[] parameters = new Object[parameterTypes.length]; + try { + int index = 0; + Serializer serialization = this.rpcInstanceAbstract.remotingService() + .serializerFactory().get(request.serializerType()); + for (Type parameterType : parameterTypes) { + parameters[index] = serialization.decode(serviceCallBody.getParameter(index), parameterType); + index++; + } + } catch (Exception e) { + ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = new ServiceExceptionInvokeMessage(); + serviceExceptionInvokeMessage.setClassFullName(e.getClass().getName()); + serviceExceptionInvokeMessage.setErrorMessage(serializeException(request.serializerType(), e)); + context.getRemotingResponse().opCode(ResponseCode.PARAMETER_ERROR); + context.getRemotingResponse().parameter(serviceExceptionInvokeMessage); + } + return parameters; + } + + private String serializeException(byte serializeType, Exception exception) { + Serializer serialization = rpcInstanceAbstract.remotingService().serializerFactory().get(serializeType); + return serialization.encode(exception).toString(); + } + + private void dealWithException(final RpcProviderContext context, RemotingCommand request, Exception exception) { + if (exception instanceof InvocationTargetException) { + context.getRemotingResponse().opCode(ResponseCode.USER_SERVICE_EXCEPTION); + } else if (exception instanceof IllegalArgumentException) { + context.getRemotingResponse().opCode(ResponseCode.PARAMETER_ERROR); + } else if (exception instanceof IllegalAccessException) { + context.getRemotingResponse().opCode(ResponseCode.ILLEGAL_ACCESS); + } else if (exception instanceof NullPointerException) { + context.getRemotingResponse().opCode(ResponseCode.NULL_POINTER); + } else { + context.getRemotingResponse().opCode(ResponseCode.USER_SERVICE_EXCEPTION); + } + String remarkMsg, exceptionMsg, exceptionClassname; + ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = new ServiceExceptionInvokeMessage(); + if (exception instanceof InvocationTargetException) { + exceptionMsg = ((InvocationTargetException) exception).getTargetException().getMessage(); + remarkMsg = exceptionMsg == null ? "" : exceptionMsg; + exceptionClassname = ((InvocationTargetException) exception).getTargetException().getClass().getName(); + } else { + exceptionMsg = exception.getMessage(); + remarkMsg = exceptionMsg == null ? "" : exceptionMsg; + exceptionClassname = exception.getClass().getName(); + } + serviceExceptionInvokeMessage.setClassFullName(exceptionClassname); + serviceExceptionInvokeMessage.setErrorMessage(remarkMsg); + if (exception.getCause() != null) + serviceExceptionInvokeMessage.setThrowable(exception.getCause()); + Object[] args = new Object[] {serviceExceptionInvokeMessage}; + Serializer serializer = ServiceUtil.selectSerializer(args, request.serializerType()); + if (serializer != null) + context.getRemotingResponse().serializerType(serializer.type()); + context.getRemotingResponse().parameter(serviceExceptionInvokeMessage); + } + + private void processRequest0(final RpcProviderContext context, final RemotingCommand request) { + final RpcServiceCallBody serviceCallBody = + request.parameter(this.rpcInstanceAbstract.remotingService().serializerFactory(), RpcServiceCallBody.class); + final RpcEntry entry = this.serviceTable.get(serviceCallBody.getServiceId()); + if (entry != null) { + Runnable runnable = new Runnable() { + @Override + public void run() { + Type[] parameterTypes = entry.getMethod().getGenericParameterTypes(); + Object result = null; + Exception exception = null; + long startTime = System.currentTimeMillis(); + try { + if (parameterTypes.length == 0) { + result = entry.getMethod().invoke(entry.getObject()); + } else { + Object[] parameters = buildParameter(context, request, serviceCallBody, parameterTypes); + result = entry.getMethod().invoke(entry.getObject(), parameters); + } + serviceStats.addProviderOKQPSValue(serviceCallBody.getServiceId(), 1, 1); + serviceStats.addProviderRTValue(serviceCallBody.getServiceId(), (int) (System.currentTimeMillis() - startTime), 1); + } catch (Exception e) { + exception = e; + } + + if (exception != null) + dealWithException(context, request, exception); + else if (!entry.getMethod().getReturnType().equals(Void.class)) { + Object[] args = new Object[] {result}; + Serializer serializer = ServiceUtil.selectSerializer(args, request.serializerType()); + if (serializer != null) + context.getRemotingResponse().serializerType(serializer.type()); + context.getRemotingResponse().parameter(result); + } + } + }; + ExecutorService executorService = this.executorTable.get(serviceCallBody.getServiceId()); + if (executorService != null) { + executorService.submit(runnable); + } else { + runnable.run(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java new file mode 100644 index 0000000..0be1014 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.promise; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.rpc.api.Promise; +import org.apache.rocketmq.rpc.api.PromiseListener; +import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager; +import org.apache.rocketmq.rpc.internal.RpcErrorMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultPromise<V> implements Promise<V> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class); + private final Object lock = new Object(); + private volatile FutureState state = FutureState.DOING; + private V result = null; + private long timeout; + private long createTime; + private Throwable exception = null; + private List<PromiseListener<V>> promiseListenerList; + + public DefaultPromise() { + createTime = System.currentTimeMillis(); + promiseListenerList = new ArrayList<>(); + timeout = 5000; + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return state.isCancelledState(); + } + + @Override + public boolean isDone() { + return state.isDoneState(); + } + + @Override + public V get() { + return result; + } + + @Override + public V get(final long timeout) { + synchronized (lock) { + if (!isDoing()) { + return getValueOrThrowable(); + } + + if (timeout <= 0) { + try { + lock.wait(); + } catch (Exception e) { + cancel(e); + } + return getValueOrThrowable(); + } else { + long waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime > 0) { + for (; ; ) { + try { + lock.wait(waitTime); + } catch (InterruptedException e) { + LOG.error("promise get value interrupted,excepiton:{}", e.getMessage()); + } + + if (!isDoing()) { + break; + } else { + waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime <= 0) { + break; + } + } + } + } + + if (isDoing()) { + timeoutSoCancel(); + } + } + return getValueOrThrowable(); + } + } + + @Override + public boolean set(final V value) { + if (value == null) + return false; + this.result = value; + return done(); + } + + @Override + public boolean setFailure(final Throwable cause) { + if (cause == null) + return false; + this.exception = cause; + return done(); + } + + @Override + public void addListener(final PromiseListener<V> listener) { + if (listener == null) { + throw new NullPointerException("FutureListener is null"); + } + + boolean notifyNow = false; + synchronized (lock) { + if (!isDoing()) { + notifyNow = true; + } else { + if (promiseListenerList == null) { + promiseListenerList = new ArrayList<>(); + } + promiseListenerList.add(listener); + } + } + + if (notifyNow) { + notifyListener(listener); + } + } + + @Override + public Throwable getThrowable() { + return exception; + } + + private void notifyListeners() { + if (promiseListenerList != null) { + for (PromiseListener<V> listener : promiseListenerList) { + notifyListener(listener); + } + } + } + + private boolean isSuccess() { + return isDone() && (exception == null); + } + + private void timeoutSoCancel() { + synchronized (lock) { + if (!isDoing()) { + return; + } + state = FutureState.CANCELLED; + exception = new RuntimeException("Get request result is timeout or interrupted"); + lock.notifyAll(); + } + notifyListeners(); + } + + private V getValueOrThrowable() { + if (exception != null) { + Throwable e = exception.getCause() != null ? exception.getCause() : exception; + throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e); + } + notifyListeners(); + return result; + } + + private boolean isDoing() { + return state.isDoingState(); + } + + private boolean done() { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.DONE; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } + + private void notifyListener(final PromiseListener<V> listener) { + try { + if (exception != null) + listener.operationFailed(this); + else + listener.operationCompleted(this); + } catch (Throwable t) { + LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); + } + } + + private boolean cancel(Exception e) { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.CANCELLED; + exception = e; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java new file mode 100644 index 0000000..aebc5e3 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.promise; + +public enum FutureState { + /** + * the task is doing + **/ + DOING(0), + /** + * the task is done + **/ + DONE(1), + /** + * ths task is cancelled + **/ + CANCELLED(2); + + public final int value; + + private FutureState(int value) { + this.value = value; + } + + public boolean isCancelledState() { + return this == CANCELLED; + } + + public boolean isDoneState() { + return this == DONE; + } + + public boolean isDoingState() { + return this == DOING; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java new file mode 100644 index 0000000..ba0180c --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.rpc.impl.server; + +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.rpc.annotation.MethodType; +import org.apache.rocketmq.rpc.api.AdvancedServer; +import org.apache.rocketmq.rpc.api.Promise; +import org.apache.rocketmq.rpc.impl.service.RpcJdkProxy; +import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory; + +public class AdvancedServerImpl implements AdvancedServer { + private final SimpleServerImpl simpleServer; + + public AdvancedServerImpl(final SimpleServerImpl simpleServer) { + this.simpleServer = simpleServer; + } + + @Override + public <T> T callSync(final RemotingChannel channel, final String serviceCode, final String version, + final Object[] parameter, final Class<T> responseType) throws Exception { + RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter); + RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel); + return (T) simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.SYNC); + } + + @Override + public <T> Promise<T> callAsync(final RemotingChannel channel, final String serviceCode, final String version, + final Object[] parameter, final Class<T> responseType) throws Exception { + RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter); + RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel); + return (Promise<T>) simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.ASYNC); + } + + @Override + public void callOneway(final RemotingChannel channel, final String serviceCode, final String version, + final Object[] parameter) throws Exception { + RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter); + RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel); + simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, Void.TYPE, MethodType.ASYNC); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java new file mode 100644 index 0000000..e076cbe --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.server; + +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.api.RemotingServer; +import org.apache.rocketmq.remoting.api.RemotingService; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.external.ThreadUtils; +import org.apache.rocketmq.remoting.impl.netty.RemotingBootstrapFactory; +import org.apache.rocketmq.rpc.api.AdvancedServer; +import org.apache.rocketmq.rpc.api.SimpleServer; +import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig; +import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract; +import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory; + +public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServer { + private RemotingServer remotingServer; + private ExecutorService callServiceThreadPool; + private RpcCommonConfig rpcCommonConfig; + + public SimpleServerImpl(final RpcCommonConfig remotingConfig) { + this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); + this.rpcCommonConfig = remotingConfig; + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()), + "serverCallServiceThread", true); + } + + public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) { + super(remotingConfig); + this.remotingServer = remotingServer; + } + + @Override + public RemotingService remotingService() { + return this.remotingServer; + } + + @Override + public void registerServiceListener() { + + } + + @Override + public <T> T bind(final Class<T> service, final RemotingChannel channel, final Properties properties) { + return this.narrow0(service, RpcProxyFactory.createServiceProxy(service, this, remotingServer, rpcCommonConfig, channel)); + } + + @Override + public AdvancedServer advancedServer() { + return new AdvancedServerImpl(this); + } + + @Override + public void publish(final Object service) { + this.publishService0(service); + } + + @Override + public void publish(final Object service, final ExecutorService executorService) { + this.publishService0(service, executorService); + } + + @Override + public void start() { + super.start(); + this.remotingServer.start(); + } + + @Override + public void stop() { + super.stop(); + ThreadUtils.shutdownGracefully(this.callServiceThreadPool, 3000, TimeUnit.MILLISECONDS); + this.remotingServer.stop(); + } + + public RemotingServer getRemotingServer() { + return remotingServer; + } + + public void setRemotingServer(final RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java new file mode 100644 index 0000000..c0a0e8c --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.service; + +import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; + +public class RpcConnectionListener implements ChannelEventListener { + private RpcInstanceAbstract rpcInstanceAbstract; + + public RpcConnectionListener(RpcInstanceAbstract rpcInstanceAbstract) { + this.rpcInstanceAbstract = rpcInstanceAbstract; + } + + @Override + public void onChannelConnect(final RemotingChannel remotingChannel) { + this.rpcInstanceAbstract.registerServiceListener(); + } + + @Override + public void onChannelClose(final RemotingChannel remotingChannel) { + + } + + @Override + public void onChannelException(final RemotingChannel remotingChannel) { + + } + + @Override + public void onChannelIdle(final RemotingChannel remotingChannel) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java new file mode 100644 index 0000000..9bfd53a --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.service; + +import java.lang.reflect.Method; +import org.apache.rocketmq.rpc.annotation.RemoteService; + +public class RpcEntry { + private RemoteService serviceExport; + private Method method; + private Object object; + + public Method getMethod() { + return method; + } + + public void setMethod(Method method) { + this.method = method; + } + + public Object getObject() { + return object; + } + + public void setObject(Object object) { + this.object = object; + } + + public RemoteService getServiceExport() { + return serviceExport; + } + + public void setServiceExport(RemoteService serviceExport) { + this.serviceExport = serviceExport; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java new file mode 100644 index 0000000..2b1288c --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.service; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.external.ThreadUtils; +import org.apache.rocketmq.rpc.impl.command.RpcRequestCode; +import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig; +import org.apache.rocketmq.rpc.impl.context.RpcProviderContext; +import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl; +import org.apache.rocketmq.rpc.impl.metrics.ThreadStats; +import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor; + +import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory; + +public abstract class RpcInstanceAbstract extends RpcProxyCommon { + protected final RpcRequestProcessor rpcRequestProcessor; + protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>(); + protected final RpcCommonConfig rpcCommonConfig; + protected ThreadStats threadStats; + private DefaultServiceAPIImpl defaultServiceAPI; + private ThreadPoolExecutor invokeServiceThreadPool; + + public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) { + super(rpcCommonConfig); + this.threadStats = new ThreadStats(); + this.rpcCommonConfig = rpcCommonConfig; + this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); + + this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true)); + + } + + public void start() { + this.defaultServiceAPI = new DefaultServiceAPIImpl(this.serviceStats, threadStats); + this.serviceStats.start(); + this.publishService0(this.defaultServiceAPI); + this.remotingService().registerRequestProcessor(RpcRequestCode.CALL_SERVICE, this.rpcRequestProcessor, this.invokeServiceThreadPool); + } + + public void stop() { + this.serviceStats.stop(); + ThreadUtils.shutdownGracefully(this.invokeServiceThreadPool, 3000, TimeUnit.MILLISECONDS); + } + + protected void publishService0(Object service) { + this.rpcRequestProcessor.putNewService(service); + } + + protected void publishService0(Object service, ExecutorService executorService) { + this.rpcRequestProcessor.putNewService(service, executorService); + } + + protected <T> T narrow0(Class<T> service, RpcJdkProxy rpcJdkProxy) { + return rpcJdkProxy.newInstance(service); + } + + public abstract void registerServiceListener(); + + public ThreadPoolExecutor getInvokeServiceThreadPool() { + return invokeServiceThreadPool; + } + + public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) { + this.invokeServiceThreadPool = invokeServiceThreadPool; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java new file mode 100644 index 0000000..351c8b4 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.service; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager; +import org.apache.rocketmq.rpc.internal.RpcErrorMapper; + +public abstract class RpcJdkProxy implements InvocationHandler { + private final Class<?> service; + private final RpcProxyCommon rpcProxyCommon; + + public RpcJdkProxy(final Class<?> service, final RpcProxyCommon rpcProxyCommon) { + this.service = service; + this.rpcProxyCommon = rpcProxyCommon; + } + + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + return rpcProxyCommon.invoke0(proxy, this, service, method, args); + } + + @SuppressWarnings("unchecked") + public <T> T newInstance(Class<T> service) { + try { + return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class[] {service}, this); + } catch (Exception e) { + throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e); + } + } + + public abstract void invokeOneWay(final RemotingCommand request); + + public abstract void invokeAsync(final RemotingCommand request, final AsyncHandler handler); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java new file mode 100644 index 0000000..4cff74f --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.service; + +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingClient; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig; + +public class RpcJdkProxyClient extends RpcJdkProxy { + private RemotingClient remotingClient; + private String remotingAddress; + private RpcCommonConfig rpcCommonConfig; + + public RpcJdkProxyClient(final Class<?> service, + final RpcProxyCommon rpcProxyCommon, + final RemotingClient remotingClient, + final RpcCommonConfig rpcCommonConfig, + final String remotingAddress) { + super(service, rpcProxyCommon); + this.remotingClient = remotingClient; + this.rpcCommonConfig = rpcCommonConfig; + this.remotingAddress = remotingAddress; + } + + @Override + public void invokeOneWay(final RemotingCommand request) { + this.remotingClient.invokeOneWay(remotingAddress, request, rpcCommonConfig.getServiceInvokeTimeout()); + } + + @Override + public void invokeAsync(final RemotingCommand request, final AsyncHandler handler) { + this.remotingClient.invokeAsync(remotingAddress, request, handler, rpcCommonConfig.getServiceInvokeTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java new file mode 100644 index 0000000..c67c8cc --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.service; + +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingServer; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig; + +public class RpcJdkProxyServer extends RpcJdkProxy { + private final RemotingServer remotingServer; + private final RemotingChannel remotingChannel; + private final RpcCommonConfig rpcCommonConfig; + + public RpcJdkProxyServer(final Class<?> service, + final RpcProxyCommon rpcProxyCommon, + final RemotingServer remotingServer, + final RpcCommonConfig rpcCommonConfig, + final RemotingChannel remotingChannel) { + super(service, rpcProxyCommon); + this.remotingServer = remotingServer; + this.remotingChannel = remotingChannel; + this.rpcCommonConfig = rpcCommonConfig; + } + + @Override + public void invokeAsync(final RemotingCommand request, final AsyncHandler handler) { + this.remotingServer.invokeAsync(remotingChannel, request, handler, rpcCommonConfig.getServiceInvokeTimeout()); + } + + @Override + public void invokeOneWay(final RemotingCommand request) { + this.remotingServer.invokeOneWay(remotingChannel, request, rpcCommonConfig.getServiceInvokeTimeout()); + } +}