- Published on
Implementasi RabbitMQ Multi Queue Exchange Topic dengan golang
- Authors
- Name
- Aulia' Illahi
- @alfath_go
- Name
Halo bro Dev, terimakasih sebelumnya telah meluangkan waktu untuk membaca artikel saya.
RabbitMQ adalah salah satu jenis broker yang sudah familiar dan ringan untuk pengembangan konsep message broker. Tools ini memudahkan kita dalam membangun aplikasi yg mengimplementasikan konsep asyncronous(background job, non-block io). Dalam contoh kasus lain message broker biasanya diterapkan pada konsep microservices EDA (Event Driven Architeture) atau microservices yg menggunakan konsep CQRS. Tapi topik tersebut akan kita bahas diartikel selanjutanya bro.
Kita kembali ke judul bro, secara umum konsep/pola bawaan RabbitMQ yang biasa digunakan itu ada 6 yaitu:
- Simple Queue
- Work Queue / Task Queue
- Publish Subcribe (fanout)
- Publish Subcribe berdasarkan routing
- Publish Subcribe berdasarkan topik
- Publish Subcribe berdasarkan header
namun di artikel ini saya akan menjelaskan konsep nomor 5 yang nantinya akan dimplementasi dengan menggunakan GO.
Publish Subcribe berdasarkan topik
RabbitMQ dibangun menggunakan konsep queuing, jadi tidak secara explicit menggunakan konsep topik. Maka dari itu jika ingin menerapkan konsep multi topik kita perlu menggunakan/mendefinisikan exchange dengan tipe topik. Kemudian perlu didefiniskan juga binding dan routing berdasarkan queue_name yg sudah kita definisikan. lansung saja kita ke contoh penamaan
Step 1 : mendefine topik exchange
exchange = "events"
type = "topic"
Step 2 : mendefine nama Queue
queue_name1 = "events.football"
queue_name3 = "events.basketball"
Step 3 : mendefinisikan binding dan route
queue_name1 = "events.football"
routingKey1 = "events.sports.football.*"
queue_name2 = "events.basketball"
routingKey2 = "events.sports.basketball.*"
queue_name1 = "events.football"
routingKey3 = "events.*"
queue_name2 = "events.basketball"
routingKey4 = "events.*"
Step 4 : implementasi
Langsung saja kita masuk ke implementasi kodenya:
Consumer
- rabbitcon.go
package rabbit
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
config *config.Config
}
func NewRabbitMQExchange(cfg *config.Config) (*RabbitMQ, error) {
url := fmt.Sprintf("amqp://%v:%v@%v:%v/", cfg.RabbitUsername, cfg.RabbitPassword, cfg.RabbitHost, cfg.RabbitPort)
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
channel, err := conn.Channel()
if err != nil {
return nil, err
}
err = channel.ExchangeDeclare(
cfg.RabbitExchange, // exchange name
"topic", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return &RabbitMQ{
conn: conn,
channel: channel,
config: cfg,
}, nil
}
- consume.go
func (r *RabbitMQ) ConsumeAll(queueName, routeKey string) (<-chan *domain.Message, error) {
// Consume messages from the specified queue
_, err := r.channel.QueueDeclare(
queueName, // empty queue name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
failOnError(err, "Failed to register a consumer")
// Bind the queue to the exchange with a wildcard routing key
err = r.channel.QueueBind(
queueName, // queue name
routeKey, // wildcard routing key
r.config.RabbitExchange, // exchange name
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
// Consume messages from the unique queue
msgs, err := r.channel.Consume(
queueName, // queue name
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
messages := make(chan *domain.Message)
go func() {
for msg := range msgs {
message := &domain.Message{
Body: string(msg.Body),
}
messages <- message
switch queueName {
case "events.sports.football":
fmt.Println("Dari ", queueName, string(msg.Body))
case "events.sports.basketball":
fmt.Println("Dari ", queueName, string(msg.Body))
}
}
}()
return messages, nil
}
- runner.go
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
// Declare queues with routing keys
queues := map[string]string{
"events.football": "events.sports.football.*",
"events.basketball": "events.sports.basketball.*",
}
g, _ := errgroup.WithContext(context.Background())
for queueName, routeKey := range queues {
queueName := queueName
routeKey := routeKey
g.Go(func() error {
return consumerHandler.StartConsumingAll(queueName, routeKey)
// Bind the queue to the exchange with the specified routing key
})
}
errGo := g.Wait()
if errGo != nil {
log.Println("[Server][Error]: ", errGo)
log.Fatal(errGo)
}
Publisher
- rabbitcon.go
type RabbitMQ struct {
Conn *amqp.Connection
Channel *amqp.Channel
}
func NewRabbitMQ(cfg env.Env, zapLogger zapLog.Logger) (RabbitMQ, error) {
url := fmt.Sprintf("amqp://%v:%v@%v:%v/", cfg.Rabbit.RabbitUsername, cfg.Rabbit.RabbitPassword, cfg.Rabbit.RabbitHost, cfg.Rabbit.RabbitPort)
conn, err := amqp.Dial(url)
if err != nil {
zapLogger.Zap.Info("Url: ", url)
LogChecklist("Rabbitmq Connection Refused", false)
//zapLogger.Zap.Panic(err)
}
channel, err := conn.Channel()
if err != nil {
LogChecklist("RabbitMQ Connection to channel failed", false)
//zapLogger.Zap.Panic(err)
}
err = channel.ExchangeDeclare(
cfg.Rabbit.RabbitExchange, // exchange name
"topic", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
LogChecklist("RabbitMQ define exchange event failed", false)
//zapLogger.Zap.Panic(err)
}
LogChecklist("RabbitMq Connection Established", true)
return RabbitMQ{
Conn: conn,
Channel: channel,
}, nil
}
- publish.go
type PublishDefinition interface {
ProduceMessage(queueName, routingKey string, message models.Message) error
}
type PublishRepo struct {
logger logger.Logger
rabbit lib.RabbitMQ
env env.Env
}
func NewPublishRepo(logger logger.Logger, rabbit lib.RabbitMQ, env env.Env) PublishDefinition {
return PublishRepo{
logger: logger,
rabbit: rabbit,
env: env,
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func (p PublishRepo) ProduceMessage(queueName, routingKey string, message models.Message) error {
exchangeName := p.env.Rabbit.RabbitExchange
err := p.rabbit.Channel.ExchangeDeclare(
exchangeName, // exchange name
"topic", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
msg, _ := json.Marshal(message)
_, err = p.rabbit.Channel.QueueDeclare(
queueName, // queue name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// Bind the queue to the exchange with the specified routing key
err = p.rabbit.Channel.QueueBind(
queueName, // queue name
routingKey, // routing key
exchangeName, // exchange name
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to bind a queue")
err = p.rabbit.Channel.Publish(
exchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: msg,
},
)
failOnError(err, "Failed to publish a message")
p.logger.Zap.Info("Message published: %s", message.Body)
return nil
}