Files
2026-06-22 13:14:00 +05:00

154 lines
2.7 KiB
Go

package main
import (
"errors"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
)
type waiter struct {
ch chan string
}
type queue struct {
items []string
waiters []*waiter
}
type broker struct {
mu sync.Mutex
queues map[string]*queue
}
func (b *broker) getOrCreate(name string) *queue {
q := b.queues[name]
if q == nil {
q = &queue{}
b.queues[name] = q
}
return q
}
func (b *broker) put(name, msg string) {
b.mu.Lock()
defer b.mu.Unlock()
q := b.getOrCreate(name)
if len(q.waiters) > 0 {
w := q.waiters[0]
q.waiters = q.waiters[1:]
w.ch <- msg
return
}
q.items = append(q.items, msg)
}
func (b *broker) getMessage(name string, timeout time.Duration) (string, bool) {
b.mu.Lock()
q := b.getOrCreate(name)
if len(q.items) > 0 {
msg := q.items[0]
q.items = q.items[1:]
b.mu.Unlock()
return msg, true
}
if timeout <= 0 {
b.mu.Unlock()
return "", false
}
w := &waiter{ch: make(chan string, 1)}
q.waiters = append(q.waiters, w)
b.mu.Unlock()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case msg := <-w.ch:
return msg, true
case <-timer.C:
b.mu.Lock()
q = b.getOrCreate(name)
for i, queued := range q.waiters {
if queued == w {
q.waiters = append(q.waiters[:i], q.waiters[i+1:]...)
b.mu.Unlock()
return "", false
}
}
b.mu.Unlock()
return <-w.ch, true
}
}
func parseTimeout(r *http.Request) (time.Duration, bool) {
raw := r.URL.Query().Get("timeout")
if raw == "" {
return 0, true
}
n, err := strconv.Atoi(raw)
if err != nil || n < 0 {
return 0, false
}
return time.Duration(n) * time.Second, true
}
func queueName(path string) (string, error) {
name := strings.TrimPrefix(path, "/")
if name == "" || strings.Contains(name, "/") {
return "", errors.New("bad queue name")
}
return name, nil
}
func main() {
if len(os.Args) != 2 {
fmt.Fprintln(os.Stderr, "usage: queue-broker <port>")
os.Exit(1)
}
b := &broker{queues: make(map[string]*queue)}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
name, err := queueName(r.URL.Path)
if err != nil {
http.NotFound(w, r)
return
}
switch r.Method {
case http.MethodPut:
values, ok := r.URL.Query()["v"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
b.put(name, values[0])
case http.MethodGet:
timeout, ok := parseTimeout(r)
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
msg, ok := b.getMessage(name, timeout)
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
_, _ = w.Write([]byte(msg))
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
})
if err := http.ListenAndServe(":"+os.Args[1], nil); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}