From 5c73a3d3cd49c42c821a0b654cd265bbb69c4df1 Mon Sep 17 00:00:00 2001 From: alisherrusinov Date: Mon, 22 Jun 2026 13:14:00 +0500 Subject: [PATCH] init commit --- main.go | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 main.go diff --git a/main.go b/main.go new file mode 100644 index 0000000..5664868 --- /dev/null +++ b/main.go @@ -0,0 +1,153 @@ +package main + +import ( + "errors" + "fmt" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" +) + +type waiter struct { + ch chan string +} + +type queue struct { + items []string + waiters []*waiter +} + +type broker struct { + mu sync.Mutex + queues map[string]*queue +} + +func (b *broker) getOrCreate(name string) *queue { + q := b.queues[name] + if q == nil { + q = &queue{} + b.queues[name] = q + } + return q +} + +func (b *broker) put(name, msg string) { + b.mu.Lock() + defer b.mu.Unlock() + + q := b.getOrCreate(name) + if len(q.waiters) > 0 { + w := q.waiters[0] + q.waiters = q.waiters[1:] + w.ch <- msg + return + } + q.items = append(q.items, msg) +} + +func (b *broker) getMessage(name string, timeout time.Duration) (string, bool) { + b.mu.Lock() + q := b.getOrCreate(name) + if len(q.items) > 0 { + msg := q.items[0] + q.items = q.items[1:] + b.mu.Unlock() + return msg, true + } + if timeout <= 0 { + b.mu.Unlock() + return "", false + } + + w := &waiter{ch: make(chan string, 1)} + q.waiters = append(q.waiters, w) + b.mu.Unlock() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case msg := <-w.ch: + return msg, true + case <-timer.C: + b.mu.Lock() + q = b.getOrCreate(name) + for i, queued := range q.waiters { + if queued == w { + q.waiters = append(q.waiters[:i], q.waiters[i+1:]...) + b.mu.Unlock() + return "", false + } + } + b.mu.Unlock() + return <-w.ch, true + } +} + +func parseTimeout(r *http.Request) (time.Duration, bool) { + raw := r.URL.Query().Get("timeout") + if raw == "" { + return 0, true + } + n, err := strconv.Atoi(raw) + if err != nil || n < 0 { + return 0, false + } + return time.Duration(n) * time.Second, true +} + +func queueName(path string) (string, error) { + name := strings.TrimPrefix(path, "/") + if name == "" || strings.Contains(name, "/") { + return "", errors.New("bad queue name") + } + return name, nil +} + +func main() { + if len(os.Args) != 2 { + fmt.Fprintln(os.Stderr, "usage: queue-broker ") + os.Exit(1) + } + + b := &broker{queues: make(map[string]*queue)} + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + name, err := queueName(r.URL.Path) + if err != nil { + http.NotFound(w, r) + return + } + + switch r.Method { + case http.MethodPut: + values, ok := r.URL.Query()["v"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + b.put(name, values[0]) + case http.MethodGet: + timeout, ok := parseTimeout(r) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + msg, ok := b.getMessage(name, timeout) + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + _, _ = w.Write([]byte(msg)) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + }) + + if err := http.ListenAndServe(":"+os.Args[1], nil); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +}