Repository: qpid-proton Updated Branches: refs/heads/master 33ec78e50 -> c36a1a711
NO-JIRA: [go] example of using a selector Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c36a1a71 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c36a1a71 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c36a1a71 Branch: refs/heads/master Commit: c36a1a711720e51b42a9d8bba7ee8c66210f4b97 Parents: 33ec78e Author: Alan Conway <acon...@redhat.com> Authored: Fri Oct 19 19:26:47 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Tue Oct 23 18:17:24 2018 -0400 ---------------------------------------------------------------------- go/examples/electron/selected_recv.go | 110 +++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c36a1a71/go/examples/electron/selected_recv.go ---------------------------------------------------------------------- diff --git a/go/examples/electron/selected_recv.go b/go/examples/electron/selected_recv.go new file mode 100644 index 0000000..b1c6e6f --- /dev/null +++ b/go/examples/electron/selected_recv.go @@ -0,0 +1,110 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strings" + + "qpid.apache.org/amqp" + "qpid.apache.org/electron" +) + +// NOTE: this example requires a broker thta supports the APACHE.ORG:SELECTOR filter. + +// Usage and command-line flags +func usage() { + fmt.Fprintf(os.Stderr, `Usage: %s +Receive messages that match a query. +`, os.Args[0]) + flag.PrintDefaults() +} + +var count = flag.Int("count", 5, "Number of messages to send, will only receive half") +var url = flag.String("url", "/examples", "Receive messages from this URL") + +// Example function to create an AMQP filter-set containing an APACHE.ORG:SELECTOR filter. +// This filters messages using an SQL-like query expression. +// See http://www.amqp.org/specification/1.0/filters for more +func selectorFilter(query string) map[amqp.Symbol]interface{} { + filter := amqp.Described{ + Descriptor: amqp.Symbol("apache.org:selector-filter:string"), // Identify the type of filter + Value: query, // Query to evaluate + } + // Return a filter set (map) with a single selector filter. + // Note "selector" is an arbitrary name to identify the filter, it can be any non-empty string. + return map[amqp.Symbol]interface{}{"selector": filter} +} + +func main() { + flag.Usage = usage + flag.Parse() + u, err := amqp.ParseURL(*url) + fatalIf(err) + container := electron.NewContainer(fmt.Sprintf("%s[%v]", os.Args[0], os.Getpid())) + c, err := container.Dial("tcp", u.Host) // NOTE: Dial takes just the Host part of the U + fatalIf(err) + defer c.Close(nil) + addr := strings.TrimPrefix(u.Path, "/") + s, err := c.Sender(electron.Target(addr)) + fatalIf(err) + r, err := c.Receiver(electron.Source(addr), electron.Filter(selectorFilter("colour = 'green'"))) + fatalIf(err) + + send(s, *count) + receive(r, *count/2) +} + +func send(s electron.Sender, n int) { + defer s.Close(nil) + for i := 0; i < n; i++ { + m := amqp.NewMessageWith(i) + if i%2 == 0 { + m.ApplicationProperties()["colour"] = "red" + } else { + m.ApplicationProperties()["colour"] = "green" + } + s.SendSync(m) + } +} + +func receive(r electron.Receiver, n int) { + defer r.Close(nil) + for i := 0; i < n; i++ { + switch rm, err := r.Receive(); err { + case nil: + rm.Accept() + log.Print(rm.Message) + case electron.Closed: + return + default: + log.Fatalf("receive error: %v", err) + } + } +} + +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org