package main import ( "errors" "fmt" "net/http" "os" "strconv" "strings" "sync" "time" ) type waiter struct { ch chan string } type queue struct { items []string waiters []*waiter } type broker struct { mu sync.Mutex queues map[string]*queue } func (b *broker) getOrCreate(name string) *queue { q := b.queues[name] if q == nil { q = &queue{} b.queues[name] = q } return q } func (b *broker) put(name, msg string) { b.mu.Lock() defer b.mu.Unlock() q := b.getOrCreate(name) if len(q.waiters) > 0 { w := q.waiters[0] q.waiters = q.waiters[1:] w.ch <- msg return } q.items = append(q.items, msg) } func (b *broker) getMessage(name string, timeout time.Duration) (string, bool) { b.mu.Lock() q := b.getOrCreate(name) if len(q.items) > 0 { msg := q.items[0] q.items = q.items[1:] b.mu.Unlock() return msg, true } if timeout <= 0 { b.mu.Unlock() return "", false } w := &waiter{ch: make(chan string, 1)} q.waiters = append(q.waiters, w) b.mu.Unlock() timer := time.NewTimer(timeout) defer timer.Stop() select { case msg := <-w.ch: return msg, true case <-timer.C: b.mu.Lock() q = b.getOrCreate(name) for i, queued := range q.waiters { if queued == w { q.waiters = append(q.waiters[:i], q.waiters[i+1:]...) b.mu.Unlock() return "", false } } b.mu.Unlock() return <-w.ch, true } } func parseTimeout(r *http.Request) (time.Duration, bool) { raw := r.URL.Query().Get("timeout") if raw == "" { return 0, true } n, err := strconv.Atoi(raw) if err != nil || n < 0 { return 0, false } return time.Duration(n) * time.Second, true } func queueName(path string) (string, error) { name := strings.TrimPrefix(path, "/") if name == "" || strings.Contains(name, "/") { return "", errors.New("bad queue name") } return name, nil } func main() { if len(os.Args) != 2 { fmt.Fprintln(os.Stderr, "usage: queue-broker ") os.Exit(1) } b := &broker{queues: make(map[string]*queue)} http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { name, err := queueName(r.URL.Path) if err != nil { http.NotFound(w, r) return } switch r.Method { case http.MethodPut: values, ok := r.URL.Query()["v"] if !ok { w.WriteHeader(http.StatusBadRequest) return } b.put(name, values[0]) case http.MethodGet: timeout, ok := parseTimeout(r) if !ok { w.WriteHeader(http.StatusBadRequest) return } msg, ok := b.getMessage(name, timeout) if !ok { w.WriteHeader(http.StatusNotFound) return } _, _ = w.Write([]byte(msg)) default: w.WriteHeader(http.StatusMethodNotAllowed) } }) if err := http.ListenAndServe(":"+os.Args[1], nil); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } }