Rabbit MQ เครื่องมือจัดการ Asynchronous อย่างเป็นระบบ
Synchronous / Asynchronous เข้ามามีบทบาทมากขึ้นเมื่อเรา implement Microservices เมื่อเรามี services เยอะขึ้น การคุยกันก็มีความซับซ้อนมากขึ้น ถ้าพูดในมุมของ client ก็คงต่างกันแค่ รอ และ ไม่รอ เท่านั้นเอง
Messaging Queue (MQ) ก็เกิดมาเพื่อเป็นตัวกลางระหว่างผู้รับและผู้ส่ง และยังมี protocol เฉพาะอย่าง AMQP ที่กำหนดเป็นมาตรฐาน เพื่อการันตีข้อมูลว่าจะไม่หายระหว่างทางด้วย ซึ่งการใช้งานนั้นจำเป็นต้องอาศัยเครื่องมืออย่าง Message Broker เพื่อควบคุม queue อีกที
Message Broker เป็น software ที่เราต้องติดตั้งบน server ไว้ด้วย เพื่อ tracking & monitoring รวมถึงมี console ให้สามารถทดสอบส่งข้อมูลเข้า queue โดยไม่ต้องเขียนโค้ดเอง ตัวอย่างเช่น Rabbit MQ, Apache Kafka และอื่นๆ อีกมากมาย
Rabbit MQ เป็น Message Broker ที่ได้รับความนิยมอีกตัวหนึ่ง สามารถใช้งานได้ฟรี และมีความ lightweight กว่าเจ้าอื่น อีกทั้งยังมี web console ให้ผู้ดูแลเข้ามาใช้งานง่ายๆ ด้วย
แนะตัวกันก่อน
- Producer / Publisher / Sender = ผู้ส่งข้อมูล
- Consumer / Subscriber / Receiver = ผู้รับข้อมูล
ส่วนนี้ใช้การเขียนโปรแกรมแบบ PubSub Design Pattern นั่นเอง และในส่วนของ Rabbit MQ จะมี component ดังนี้
- Exchange = รูปแบบการส่ง
- Queue = ก้อนข้อมูล
- Binding / Routing = เส้นทาง
สิ่งที่ RabbitMQ จะทำให้คือ การจัดคิว และการควบคุมเส้นทางการส่งข้อมูล โดยเราสามารถเลือกใช้ตามประเภทที่เราต้องการได้
จากภาพจะเห็นว่า Rabbit MQ หรือ Broker ของเรานั้น จะมีหน้าที่ควบคุมเมื่อได้รับข้อมูลมาแล้ว ควรจะส่งต่อให้ใครบ้าง โดยเงื่อนไขการส่งต่อนั้นขึ้นอยู่กับประเภทของ Exchange และ Binding
ถ้าภาพยังชัดไม่มากพอ ลองดู vdo จาก cloudamqp ก็ได้
เริ่มเข้าสู่โหมด Coding กัน
ขาส่ง
- Producer จะต้องเชื่อมต่อ AMQP ดังนี้ amqp://guest:[email protected]:5672/
- Producer จะต้องสร้าง Exchange พร้อมกำหนดประเภท
- เมื่อต้องการส่งข้อมูล Producer จะต้อง publish โดยใช้ routing key* และ
package mq
import (
"fmt"
"github.com/streadway/amqp"
"log"
"net/url"
"os"
)
type Publisher struct {
Conn *amqp.Connection
}
func getMQConnection() string {
u := &url.URL{
Scheme: "amqp",
User: url.UserPassword(os.Getenv("MQ_USERNAME"), os.Getenv("MQ_PASSWORD")),
Host: fmt.Sprintf("%s:%s", os.Getenv("MQ_HOST"), os.Getenv("MQ_PORT")),
}
//"amqp://guest:[email protected]:5672/"
return u.String()
}
func openMQConnection() *amqp.Connection {
conn, err := amqp.Dial(getMQConnection())
failOnError(err, "Failed to connect to RabbitMQ")
return conn
}
func CreateMQService() *Publisher {
pub := &Publisher{
Conn: openMQConnection(),
}
return pub
}
func (pub *Publisher) Publish(exchangeName string, routeKey string, body string) {
ch, err := pub.Conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
exchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
err = ch.Publish(
exchangeName, // exchange
routeKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
ขารับ
- Consumer จะต้องเชื่อมต่อ AMQP เช่น amqp://guest:[email protected]:5672/
- Consumer จะต้องสร้าง Queue
- Bind ระหว่าง่ Queue กับ Exchange ด้วย routing key ตามเงื่อนไข
package mq
import (
"github.com/streadway/amqp"
"log"
)
func Consume(queueName string) {
conn, err := amqp.Dial(getMQConnection())
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
queueName, // queue name
"key", // routing key
"my-ex", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}