Awesome work! I would like to play with it a bit and get pprof data, just to check that we are all good and lots of data on the interface won't hit GC hard..
Diff comments: > diff --git a/src/maasagent/cmd/netmon/main.go > b/src/maasagent/cmd/netmon/main.go > index e83655f..09306fa 100644 > --- a/src/maasagent/cmd/netmon/main.go > +++ b/src/maasagent/cmd/netmon/main.go > @@ -1,9 +1,93 @@ > package main > > +/* > + Copyright 2023 Canonical Ltd. This software is licensed under the > + GNU Affero General Public License version 3 (see the file LICENSE). > +*/ > + > import ( > + "context" > + "encoding/json" > + "errors" > + "os" > + "strconv" > + > + "github.com/rs/zerolog" > + "github.com/rs/zerolog/log" > + "golang.org/x/sync/errgroup" > + > "launchpad.net/maas/maas/src/maasagent/internal/netmon" > ) > > +var ( > + ErrMissingIface = errors.New("Missing interface argument") > +) > + > +func Run() int { > + var ( > + debug bool > + err error > + ) > + > + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) > + > + debugStr, ok := os.LookupEnv("DEBUG") Maybe a "nice to have" thing is to use zerolog.ParseLevel() so we are not limited to Info and Debug, but can provide a flexible log level? Example: https://git.launchpad.net/~troyanov/maas/commit/?id=1a1a847d772dcc7123f9c8f8b0b4b9bda135fe6b > + if ok { > + debug, err = strconv.ParseBool(debugStr) > + if err != nil { > + log.Error().Err(err).Msg("Unable to parse debug flag") > + return 2 > + } > + } > + > + if debug { > + zerolog.SetGlobalLevel(zerolog.DebugLevel) > + } else { > + zerolog.SetGlobalLevel(zerolog.InfoLevel) > + } > + > + if len(os.Args) < 2 { > + log.Error().Err(ErrMissingIface).Msg("Please provide an > interface to monitor") > + return 2 > + } > + iface := os.Args[1] > + > + bkg := context.Background() Just a nitpick, since `bkg` is not used anywhere on its own, it can be changed a oneliner: ctx, cancel := context.WithCancel(context.Background()) > + ctx, cancel := context.WithCancel(bkg) > + > + sigC := make(chan os.Signal) > + resultC := make(chan netmon.Result) > + > + g, ctx := errgroup.WithContext(ctx) > + g.SetLimit(2) > + > + svc := netmon.NewService(iface) > + g.Go(func() error { > + return svc.Start(ctx, resultC) > + }) > + g.Go(func() error { > + encoder := json.NewEncoder(os.Stdout) > + for { > + select { > + case <-sigC: > + cancel() > + return nil > + case res := <-resultC: > + err = encoder.Encode(res) > + if err != nil { > + return err > + } > + } > + } > + }) > + log.Info().Msg("Service netmon started") > + if err := g.Wait(); err != nil { > + log.Error().Err(err).Msg("") It seems that we can remove `.Msg("")` >From the doc: // Msg sends the *Event with msg added as the message field if not empty. > + return 1 > + } > + return 0 > +} > + > func main() { > - netmon.NewService() > + os.Exit(Run()) > } > diff --git a/src/maasagent/go.mod b/src/maasagent/go.mod > index 9bee24c..6adeafc 100644 > --- a/src/maasagent/go.mod > +++ b/src/maasagent/go.mod > @@ -1,3 +1,30 @@ > module launchpad.net/maas/maas/src/maasagent > > go 1.18 > + > +require ( > + github.com/mdlayher/arp v0.0.0-20220512170110-6706a2966875 I don't see any direct imports of `github.com/mdlayher/arp`, maybe a missing `go mod tidy` run here? > + github.com/packetcap/go-pcap v0.0.0-20230225181818-eba71accde5b > + github.com/rs/zerolog v1.29.1 > + github.com/stretchr/testify v1.7.0 > +) > + > +require ( > + github.com/davecgh/go-spew v1.1.1 // indirect > + github.com/google/go-cmp v0.5.8 // indirect > + github.com/google/gopacket v1.1.17 // indirect > + github.com/josharian/native v1.0.0 // indirect > + github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect > + github.com/mattn/go-colorable v0.1.12 // indirect > + github.com/mattn/go-isatty v0.0.14 // indirect > + github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118 // > indirect > + github.com/mdlayher/packet v1.0.0 // indirect > + github.com/mdlayher/socket v0.2.1 // indirect > + github.com/pmezard/go-difflib v1.0.0 // indirect > + github.com/sirupsen/logrus v1.4.2 // indirect > + golang.org/x/net v0.7.0 // indirect > + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect > + golang.org/x/sys v0.5.0 // indirect > + gopkg.in/yaml.v2 v2.2.8 // indirect > + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect > +) > diff --git a/src/maasagent/internal/arp/ethernet.go > b/src/maasagent/internal/arp/ethernet.go > new file mode 100644 > index 0000000..0d3e7d3 > --- /dev/null > +++ b/src/maasagent/internal/arp/ethernet.go > @@ -0,0 +1,133 @@ > +package arp > + > +/* > + Copyright 2023 Canonical Ltd. This software is licensed under the > + GNU Affero General Public License version 3 (see the file LICENSE). > +*/ > + > +import ( > + "encoding/binary" > + "errors" > + "io" > + "net" > +) > + > +const ( > + minEthernetLen = 14 > +) > + > +const ( > + // EthernetTypeLLC is a special ethernet type, if found the frame is > truncated > + EthernetTypeLLC uint16 = 0 > + // EthernetTypeIPv4 is the ethernet type for a frame containing an IPv4 > packet > + EthernetTypeIPv4 uint16 = 0x0800 > + // EthernetTypeARP is the ethernet type for a frame containing an ARP > packet > + EthernetTypeARP uint16 = 0x0806 > + // EthernetTypeIPv6 is the ethernet type for a frame containing an IPv6 > packet > + EthernetTypeIPv6 uint16 = 0x86dd > + // EthernetTypeVLAN is the ethernet type for a frame containing a VLAN > tag, > + // the VLAN tag bytes will indicate the actual type of packet the frame > contains > + EthernetTypeVLAN uint16 = 0x8100 > + > + // NonStdLenEthernetTypes is a magic number to find any non-standard > types > + // and mark them as EthernetTypeLLC > + NonStdLenEthernetTypes uint16 = 0x600 > +) > + > +var ( > + // ErrNotVLAN is an error returned when calling > EthernetFrame.ExtractVLAN > + // if the frame is not of type EthernetTypeVLAN > + ErrNotVLAN = errors.New("ethernet frame not of type VLAN") > + // ErrMalformedVLAN is an error returned when parsing a VLAN tag > + // that is malformed > + ErrMalformedVLAN = errors.New("VLAN tag is malformed") > +) > + > +// VLAN represents a VLAN tag within an ethernet frame > +type VLAN struct { > + Priority uint8 > + DropEligible bool > + ID uint16 > + EthernetType uint16 Just curious, why we don't want to have `VLAN *VLAN` as a field of the `EthernetFrame` type? > +} > + > +// UnmarshalBinary will take the ethernet frame's payload > +// and extract a VLAN tag if one is present > +func (v *VLAN) UnmarshalBinary(buf []byte) error { > + if len(buf) < 4 { > + return ErrMalformedVLAN > + } > + > + // extract the first 3 bits > + v.Priority = (buf[0] & 0xe0) >> 5 > + // extract the next bit and turn it into a bool > + v.DropEligible = buf[0]&0x10 != 0 > + // extract the next 12 bits for an ID > + v.ID = binary.BigEndian.Uint16(buf[:2]) & 0x0fff > + // last 2 bytes are ethernet type > + v.EthernetType = binary.BigEndian.Uint16(buf[2:]) > + return nil > +} > + > +// EthernetFrame represents an ethernet frame > +type EthernetFrame struct { > + SrcMAC net.HardwareAddr > + DstMAC net.HardwareAddr > + EthernetType uint16 > + Len uint16 > + Payload []byte > +} > + > +// ExtractARPPacket will extract an ARP packet from the ethernet frame's > +// payload > +func (e *EthernetFrame) ExtractARPPacket() (*Packet, error) { > + var buf []byte It seems that this method might be a hotpath. Maybe using `sync.Pool` will give us an improvement here? > + if e.EthernetType == EthernetTypeVLAN { > + buf = e.Payload[4:] > + } else { > + buf = e.Payload > + } > + a := &Packet{} > + err := a.UnmarshalBinary(buf) > + if err != nil { > + return nil, err > + } > + return a, nil > +} > + > +// ExtractVLAN will extract the VLAN tag from the ethernet frame's > +// payload if one is present and return ErrNotVLAN if not > +func (e *EthernetFrame) ExtractVLAN() (*VLAN, error) { > + if e.EthernetType != EthernetTypeVLAN { > + return nil, ErrNotVLAN > + } > + v := &VLAN{} > + err := v.UnmarshalBinary(e.Payload[0:4]) > + if err != nil { > + return nil, err > + } > + return v, nil > +} > + > +// UnmarshalBinary parses ethernet frame bytes into an EthernetFrame > +func (eth *EthernetFrame) UnmarshalBinary(buf []byte) error { > + if len(buf) < minEthernetLen { > + return io.ErrUnexpectedEOF > + } > + > + eth.DstMAC = buf[0:6] > + eth.SrcMAC = buf[6:12] > + eth.EthernetType = binary.BigEndian.Uint16(buf[12:14]) > + eth.Payload = buf[14:] > + if eth.EthernetType < NonStdLenEthernetTypes { > + eth.Len = eth.EthernetType Not very obvious why we assign eth.EthernetType to Len? > + eth.EthernetType = EthernetTypeLLC > + cmp := len(eth.Payload) - int(eth.Len) > + if cmp < 0 { > + return io.ErrUnexpectedEOF > + } else if cmp > 0 { > + eth.Payload = eth.Payload[:len(eth.Payload)-cmp] > + } > + } > + return nil > +} > diff --git a/src/maasagent/internal/arp/packet.go > b/src/maasagent/internal/arp/packet.go > new file mode 100644 > index 0000000..d397e85 > --- /dev/null > +++ b/src/maasagent/internal/arp/packet.go > @@ -0,0 +1,163 @@ > +package arp > + > +/* > + Copyright 2023 Canonical Ltd. This software is licensed under the > + GNU Affero General Public License version 3 (see the file LICENSE). > +*/ > + > +import ( > + "encoding/binary" > + "errors" > + "fmt" > + "io" > + "net" > + "net/netip" > +) > + > +const ( > + // HardwareTypeReserved is a special value for hardware type > + HardwareTypeReserved uint16 = iota // see RFC5494 > + // HardwareTypeEthernet is the hardware type value for Ethernet > + // we only care about ethernet, but additional types are defined for > + // testing and possible future use > + HardwareTypeEthernet > + // HardwareTypeExpEth is the hardware type for experimental ethernet > + HardwareTypeExpEth > + // HardwareTypeAX25 is the hardware type for Radio AX.25 > + HardwareTypeAX25 > + _ That might be a dangerous one if we'll have to extend this list of constants. As well as `HardwareTypeHIPARP uint16 = iota + 28` Maybe it would be better to define values explicit for every constant? > + // HardwareTypeChaos is a chaos value for hardware type > + HardwareTypeChaos > + // HardwareTypeIEEE802 is for IEEE 802 networks > + HardwareTypeIEEE802 > + > + // skipping propriatary networks > + > + // HardwareTypeFiberChannel is the hardware type for fiber channel > + HardwareTypeFiberChannel uint16 = 18 > + // HardwareTypeSerialLine is the hardware type for serial line > + HardwareTypeSerialLine uint16 = 19 > + // HardwareTypeHIPARP is the hardware type for HIPARP > + HardwareTypeHIPARP uint16 = iota + 28 > + // HardwareTypeIPARPISO7163 is the hardware type for IP and ARP over > ISO 7816-3 > + HardwareTypeIPARPISO7163 > + // HardwareTypeARPSec is the hardware type for ARPSec > + HardwareTypeARPSec > + // HardwareTypeIPSec is the hardware type for IPSec tunnel > + HardwareTypeIPSec > + // HardwareTypeInfiniBand is the hardware type for InfiniBand > + HardwareTypeInfiniBand > +) > + > +const ( > + // ProtocolTypeIPv4 is the value for IPv4 ARP packets > + ProtocolTypeIPv4 uint16 = 0x0800 > + // ProtocolTypeIPv6 is the value for IPv6 ARP packets, > + // which shouldn't be used, this is defined for testing purposes > + ProtocolTypeIPv6 uint16 = 0x86dd > +) > + > +const ( > + // OpReserved is a special reserved OpCode > + OpReserved uint16 = iota // see RFC5494 > + // OpRequest is the OpCode for ARP requests > + OpRequest > + // OpReply is the OpCode for ARP replies > + OpReply > +) > + > +var ( > + // ErrMalformedPacket is an error returned when parsing a malformed ARP > packet > + ErrMalformedPacket = errors.New("malformed ARP packet") > +) > + > +// Packet is a struct containing the data of an ARP packet > +type Packet struct { > + HardwareType uint16 > + ProtocolType uint16 > + HardwareAddrLen uint8 > + ProtocolAddrLen uint8 > + OpCode uint16 > + SendHwdAddr net.HardwareAddr > + SendIPAddr netip.Addr > + TgtHwdAddr net.HardwareAddr > + TgtIPAddr netip.Addr > +} > + > +func checkPacketLen(buf []byte, bytesRead, length int) error { > + if len(buf[bytesRead:]) < length { > + return io.ErrUnexpectedEOF > + } > + return nil > +} > + > +// UnmarsahalBinary takes the ARP packet bytes and parses it into a Packet > +func (pkt *Packet) UnmarshalBinary(buf []byte) error { > + var ( > + bytesRead int > + ) > + > + err := checkPacketLen(buf, bytesRead, 8) > + if err != nil { > + return fmt.Errorf("%w: packet missing initial ARP fields", err) > + } > + > + pkt.HardwareType = binary.BigEndian.Uint16(buf[0:2]) > + pkt.ProtocolType = binary.BigEndian.Uint16(buf[2:4]) > + pkt.HardwareAddrLen = buf[4] > + pkt.ProtocolAddrLen = buf[5] > + pkt.OpCode = binary.BigEndian.Uint16(buf[6:8]) > + > + bytesRead = 8 > + hwdAddrLen := int(pkt.HardwareAddrLen) > + ipAddrLen := int(pkt.ProtocolAddrLen) > + > + err = checkPacketLen(buf, bytesRead, hwdAddrLen) > + if err != nil { > + return fmt.Errorf("%w: packet too short for sender hardware > address", err) > + } > + > + sendHwdAddrBuf := make([]byte, hwdAddrLen) > + copy(sendHwdAddrBuf[:], buf[bytesRead:bytesRead+hwdAddrLen]) > + pkt.SendHwdAddr = sendHwdAddrBuf > + bytesRead += hwdAddrLen > + > + err = checkPacketLen(buf, bytesRead, ipAddrLen) > + if err != nil { > + return fmt.Errorf("%w: packet too short for sender IP address", > err) > + } > + > + var ok bool > + > + sendIPAddrBuf := make([]byte, ipAddrLen) > + copy(sendIPAddrBuf[:], buf[bytesRead:bytesRead+ipAddrLen]) > + pkt.SendIPAddr, ok = netip.AddrFromSlice(sendIPAddrBuf) > + if !ok { > + return fmt.Errorf("%w: invalid sender IP address", > ErrMalformedPacket) > + } > + bytesRead += ipAddrLen > + > + err = checkPacketLen(buf, bytesRead, hwdAddrLen) > + if err != nil { > + return fmt.Errorf("%w: packet too short for target hardware > address", err) > + } > + > + tgtHwdAddrBuf := make([]byte, hwdAddrLen) > + copy(tgtHwdAddrBuf[:], buf[bytesRead:bytesRead+hwdAddrLen]) > + pkt.TgtHwdAddr = tgtHwdAddrBuf > + bytesRead += hwdAddrLen > + > + err = checkPacketLen(buf, bytesRead, ipAddrLen) > + if err != nil { > + return fmt.Errorf("%w: packet too short for target IP address", > err) > + } > + > + tgtIPAddrBuf := make([]byte, ipAddrLen) > + copy(tgtIPAddrBuf[:], buf[bytesRead:bytesRead+ipAddrLen]) > + pkt.TgtIPAddr, ok = netip.AddrFromSlice(tgtIPAddrBuf) > + if !ok { > + return fmt.Errorf("%w: invalid target IP address", > ErrMalformedPacket) > + } > + > + return nil > +} > diff --git a/src/maasagent/internal/netmon/service.go > b/src/maasagent/internal/netmon/service.go > index 14ebdc6..a88604f 100644 > --- a/src/maasagent/internal/netmon/service.go > +++ b/src/maasagent/internal/netmon/service.go > @@ -1,3 +1,242 @@ > package netmon > > -func NewService() {} > +/* > + Copyright 2023 Canonical Ltd. This software is licensed under the > + GNU Affero General Public License version 3 (see the file LICENSE). > +*/ > + > +import ( > + "bytes" > + "context" > + "errors" > + "fmt" > + "net" > + "net/netip" > + "time" > + > + pcap "github.com/packetcap/go-pcap" > + "github.com/rs/zerolog/log" > + > + "launchpad.net/maas/maas/src/maasagent/internal/arp" > +) > + > +const ( > + snapLen = 64 > + timeout = 5 * time.Minute > + seenAgainThreshold = 600 * time.Second > +) > + > +const ( > + // EventNew is the Event value for a new Result > + EventNew = "NEW" > + // EventRefreshed is the Event value for a Result that is for > + // refreshed ARP values > + EventRefreshed = "REFRESHED" > + // EventMoved is the Event value for a Result where the IP has > + // changed its MAC address > + EventMoved = "MOVED" > +) > + > +var ( > + // ErrEmptyPacket is returned when a packet of 0 bytes has been received > + ErrEmptyPacket = errors.New("received an empty packet") > +) > + > +// Binding represents the binding between an IP address and MAC address > +type Binding struct { > + // IP is the IP a binding is tracking > + IP netip.Addr > + // MAC is the MAC address the IP is currently bound to > + MAC net.HardwareAddr > + // VID is the associated VLAN ID, if one exists > + VID *uint16 > + // Time is the time the packet creating / updating the binding > + // was observed > + Time time.Time > +} > + > +// Result is the result of observed ARP packets > +type Result struct { > + // IP is the presentation format of an observed IP > + IP string `json:"ip"` > + // MAC is the presentation format of an observed MAC > + MAC string `json:"mac"` > + // Previous MAC is the presentation format of a previous MAC if > + // an EventMoved was observed > + PreviousMAC string `json:"previous_mac,omitempty"` > + // Event is the type of event the Result is > + Event string `json:"event"` > + // Time is the time the packet creating the Result was observed > + Time int64 `json:"time"` > + // VID is the VLAN ID if one exists > + VID *uint16 `json:"vid"` > +} > + > +// Service is responsible for starting packet capture and > +// converting observed ARP packets into discovered Results > +type Service struct { > + iface string > + bindings map[string]Binding > +} > + > +// NewService returns a pointer to a Service. It > +// takes the desired interface to observe's name as an argument > +func NewService(iface string) *Service { > + return &Service{ > + iface: iface, > + bindings: make(map[string]Binding), > + } > +} > + > +func (s *Service) updateBindings(pkt *arp.Packet, vid *uint16, timestamp > time.Time) (res []Result) { > + if timestamp.IsZero() { > + timestamp = time.Now() > + } > + > + var vidLabel int Can be shortened? var vidLabel int if vid != nil { vidLabel = int(*vid) } > + if vid == nil { > + vidLabel = 0 > + } else { > + vidLabel = int(*vid) > + } > + > + discoveredBindings := []Binding{ > + { > + IP: pkt.SendIPAddr, > + MAC: pkt.SendHwdAddr, > + VID: vid, > + Time: timestamp, > + }, > + } > + if pkt.OpCode == arp.OpReply { > + discoveredBindings = append(discoveredBindings, Binding{ > + IP: pkt.TgtIPAddr, > + MAC: pkt.TgtHwdAddr, > + VID: vid, > + Time: timestamp, > + }) > + } > + > + for _, discoveredBinding := range discoveredBindings { > + key := fmt.Sprintf("%d_%s", vidLabel, > discoveredBinding.IP.String()) > + binding, ok := s.bindings[key] > + if ok { > + if bytes.Compare(binding.MAC, discoveredBinding.MAC) != > 0 { > + s.bindings[key] = discoveredBinding > + res = append(res, Result{ > + IP: > discoveredBinding.IP.String(), > + PreviousMAC: binding.MAC.String(), > + MAC: > discoveredBinding.MAC.String(), > + VID: discoveredBinding.VID, > + Time: > discoveredBinding.Time.Unix(), > + Event: EventMoved, > + }) > + } else if discoveredBinding.Time.Sub(binding.Time) >= > seenAgainThreshold { > + s.bindings[key] = discoveredBinding > + res = append(res, Result{ > + IP: discoveredBinding.IP.String(), > + MAC: discoveredBinding.MAC.String(), > + VID: discoveredBinding.VID, > + Time: discoveredBinding.Time.Unix(), > + Event: EventRefreshed, > + }) > + } > + } else { > + s.bindings[key] = discoveredBinding > + res = append(res, Result{ > + IP: discoveredBinding.IP.String(), > + MAC: discoveredBinding.MAC.String(), > + VID: discoveredBinding.VID, > + Time: discoveredBinding.Time.Unix(), > + Event: EventNew, > + }) > + } > + } > + > + return res > +} > + > +func isValidARPPacket(pkt *arp.Packet) bool { > + if pkt.HardwareType != arp.HardwareTypeEthernet { > + return false > + } > + if pkt.ProtocolType != arp.ProtocolTypeIPv4 { > + return false > + } > + if pkt.HardwareAddrLen != 6 { > + return false > + } > + if pkt.ProtocolAddrLen != 4 { > + return false > + } > + return true > +} > + > +func (s *Service) handlePacket(pkt pcap.Packet) ([]Result, error) { > + if pkt.Error != nil { > + return nil, pkt.Error > + } > + if len(pkt.B) == 0 { > + return nil, ErrEmptyPacket > + } > + eth := &arp.EthernetFrame{} > + err := eth.UnmarshalBinary(pkt.B) > + if err != nil { > + return nil, err > + } > + > + if eth.EthernetType != arp.EthernetTypeVLAN && eth.EthernetType != > arp.EthernetTypeARP { > + log.Debug().Msg("skipping non-ARP packet") > + return nil, nil > + } > + > + var vid *uint16 > + if eth.EthernetType == arp.EthernetTypeVLAN { > + vlan, err := eth.ExtractVLAN() > + if err != nil { > + return nil, err > + } > + vid = &vlan.ID > + } > + > + arpPkt, err := eth.ExtractARPPacket() > + if err != nil { > + return nil, err > + } > + > + if !isValidARPPacket(arpPkt) { > + log.Debug().Msg("skipping non-ethernet+IPv4 ARP packet") > + return nil, nil > + } > + return s.updateBindings(arpPkt, vid, pkt.Info.Timestamp), nil > +} > + > +// Start will start packet capture and send results to Service.ResC > +// if an error occurs, it is sent to Service.ErrC > +func (s *Service) Start(ctx context.Context, resultC chan<- Result) error { > + defer close(resultC) > + > + hndlr, err := pcap.OpenLive(s.iface, snapLen, false, timeout, true) > + if err != nil { > + return err > + } > + defer hndlr.Close() > + pkts := hndlr.Listen() > + for { > + select { > + case <-ctx.Done(): > + return nil > + case pkt, ok := <-pkts: > + if !ok { > + log.Debug().Msg("packet capture has closed") > + } > + res, err := s.handlePacket(pkt) > + if err != nil { > + return err > + } > + for _, r := range res { > + resultC <- r > + } > + } > + } > +} -- https://code.launchpad.net/~cgrabowski/maas/+git/maas/+merge/441702 Your team MAAS Committers is subscribed to branch maas:master. -- Mailing list: https://launchpad.net/~sts-sponsors Post to : sts-sponsors@lists.launchpad.net Unsubscribe : https://launchpad.net/~sts-sponsors More help : https://help.launchpad.net/ListHelp