Published on

Implementasi RabbitMQ Multi Queue Exchange Topic dengan golang

Authors

Halo bro Dev, terimakasih sebelumnya telah meluangkan waktu untuk membaca artikel saya.

RabbitMQ adalah salah satu jenis broker yang sudah familiar dan ringan untuk pengembangan konsep message broker. Tools ini memudahkan kita dalam membangun aplikasi yg mengimplementasikan konsep asyncronous(background job, non-block io). Dalam contoh kasus lain message broker biasanya diterapkan pada konsep microservices EDA (Event Driven Architeture) atau microservices yg menggunakan konsep CQRS. Tapi topik tersebut akan kita bahas diartikel selanjutanya bro.

Kita kembali ke judul bro, secara umum konsep/pola bawaan RabbitMQ yang biasa digunakan itu ada 6 yaitu:

  1. Simple Queue
  2. Work Queue / Task Queue
  3. Publish Subcribe (fanout)
  4. Publish Subcribe berdasarkan routing
  5. Publish Subcribe berdasarkan topik
  6. Publish Subcribe berdasarkan header

namun di artikel ini saya akan menjelaskan konsep nomor 5 yang nantinya akan dimplementasi dengan menggunakan GO.

Publish Subcribe berdasarkan topik

rabbit-pattern-5

RabbitMQ dibangun menggunakan konsep queuing, jadi tidak secara explicit menggunakan konsep topik. Maka dari itu jika ingin menerapkan konsep multi topik kita perlu menggunakan/mendefinisikan exchange dengan tipe topik. Kemudian perlu didefiniskan juga binding dan routing berdasarkan queue_name yg sudah kita definisikan. lansung saja kita ke contoh penamaan

Step 1 : mendefine topik exchange
exchange = "events"
type = "topic"

Step 2 : mendefine nama Queue
queue_name1 = "events.football"
queue_name3 = "events.basketball"

Step 3 : mendefinisikan binding dan route
queue_name1 = "events.football"
routingKey1 = "events.sports.football.*"

queue_name2 = "events.basketball"
routingKey2 = "events.sports.basketball.*"

queue_name1 = "events.football"
routingKey3 = "events.*"

queue_name2 = "events.basketball"
routingKey4 = "events.*"

Step 4 : implementasi

Langsung saja kita masuk ke implementasi kodenya:

Consumer

  1. rabbitcon.go
    package rabbit

    type RabbitMQ struct {
        conn        *amqp.Connection
        channel     *amqp.Channel
        config      *config.Config
    }

    func NewRabbitMQExchange(cfg *config.Config) (*RabbitMQ, error) {
        url := fmt.Sprintf("amqp://%v:%v@%v:%v/", cfg.RabbitUsername, cfg.RabbitPassword, cfg.RabbitHost, cfg.RabbitPort)

        conn, err := amqp.Dial(url)
        if err != nil {
            return nil, err
        }

        channel, err := conn.Channel()
        if err != nil {
            return nil, err
        }

        err = channel.ExchangeDeclare(
            cfg.RabbitExchange, // exchange name
            "topic",            // exchange type
            true,               // durable
            false,              // auto-deleted
            false,              // internal
            false,              // no-wait
            nil,                // arguments
        )
        if err != nil {
            return nil, err
        }

        return &RabbitMQ{
            conn:        conn,
            channel:     channel,
            config:      cfg,
        }, nil
    }
  1. consume.go
    func (r *RabbitMQ) ConsumeAll(queueName, routeKey string) (<-chan *domain.Message, error) {
        // Consume messages from the specified queue
        _, err := r.channel.QueueDeclare(
            queueName, // empty queue name
            true,      // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        if err != nil {
            return nil, err
        }

        failOnError(err, "Failed to register a consumer")

        // Bind the queue to the exchange with a wildcard routing key
        err = r.channel.QueueBind(
            queueName,               // queue name
            routeKey,                // wildcard routing key
            r.config.RabbitExchange, // exchange name
            false,                   // no-wait
            nil,                     // arguments
        )
        if err != nil {
            return nil, err
        }

        // Consume messages from the unique queue
        msgs, err := r.channel.Consume(
            queueName, // queue name
            "",        // consumer
            true,      // auto-ack
            false,     // exclusive
            false,     // no-local
            false,     // no-wait
            nil,       // arguments
        )
        if err != nil {
            return nil, err
        }

        messages := make(chan *domain.Message)

        go func() {
            for msg := range msgs {
                message := &domain.Message{
                    Body: string(msg.Body),
                }
                messages <- message

                switch queueName {
                case "events.sports.football":
                    fmt.Println("Dari ", queueName, string(msg.Body))
                case "events.sports.basketball":
                    fmt.Println("Dari ", queueName, string(msg.Body))
                }

            }
        }()

        return messages, nil
    }
  1. runner.go
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

	// Declare queues with routing keys
	queues := map[string]string{
		"events.football": "events.sports.football.*",
        "events.basketball": "events.sports.basketball.*",
	}
	g, _ := errgroup.WithContext(context.Background())

	for queueName, routeKey := range queues {
		queueName := queueName
		routeKey := routeKey
		g.Go(func() error {
			return consumerHandler.StartConsumingAll(queueName, routeKey)
			// Bind the queue to the exchange with the specified routing key
		})
	}

	errGo := g.Wait()
	if errGo != nil {
		log.Println("[Server][Error]: ", errGo)
		log.Fatal(errGo)
	}

Publisher

  1. rabbitcon.go
    type RabbitMQ struct {
	Conn    *amqp.Connection
	Channel *amqp.Channel
    }

    func NewRabbitMQ(cfg env.Env, zapLogger zapLog.Logger) (RabbitMQ, error) {
	url := fmt.Sprintf("amqp://%v:%v@%v:%v/", cfg.Rabbit.RabbitUsername, cfg.Rabbit.RabbitPassword, cfg.Rabbit.RabbitHost, cfg.Rabbit.RabbitPort)

	conn, err := amqp.Dial(url)
	if err != nil {
		zapLogger.Zap.Info("Url: ", url)
		LogChecklist("Rabbitmq Connection Refused", false)
		//zapLogger.Zap.Panic(err)
	}
	
	channel, err := conn.Channel()
	if err != nil {
		LogChecklist("RabbitMQ Connection to channel failed", false)
		//zapLogger.Zap.Panic(err)
	}

	err = channel.ExchangeDeclare(
		cfg.Rabbit.RabbitExchange, // exchange name
		"topic",                   // exchange type
		true,                      // durable
		false,                     // auto-deleted
		false,                     // internal
		false,                     // no-wait
		nil,                       // arguments
	)
	if err != nil {
		LogChecklist("RabbitMQ define exchange event failed", false)
		//zapLogger.Zap.Panic(err)
	}
	LogChecklist("RabbitMq Connection Established", true)

	return RabbitMQ{
		Conn:    conn,
		Channel: channel,
	}, nil
    }

  1. publish.go
    type PublishDefinition interface {
	    ProduceMessage(queueName, routingKey string, message models.Message) error
    }
    type PublishRepo struct {
	    logger logger.Logger
	    rabbit lib.RabbitMQ
	    env    env.Env
    }

    func NewPublishRepo(logger logger.Logger, rabbit lib.RabbitMQ, env env.Env) PublishDefinition {
        return PublishRepo{
            logger: logger,
            rabbit: rabbit,
            env:    env,
        }
    }

    func failOnError(err error, msg string) {
        if err != nil {
            log.Printf("%s: %s", msg, err)
        }
    }
    func (p PublishRepo) ProduceMessage(queueName, routingKey string, message models.Message) error {
        exchangeName := p.env.Rabbit.RabbitExchange
        err := p.rabbit.Channel.ExchangeDeclare(
            exchangeName, // exchange name
            "topic",      // exchange type
            true,         // durable
            false,        // auto-deleted
            false,        // internal
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        msg, _ := json.Marshal(message)

        _, err = p.rabbit.Channel.QueueDeclare(
            queueName, // queue name
            true,      // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")

        // Bind the queue to the exchange with the specified routing key
        err = p.rabbit.Channel.QueueBind(
            queueName,    // queue name
            routingKey,   // routing key
            exchangeName, // exchange name
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to bind a queue")
        err = p.rabbit.Channel.Publish(
            exchangeName, // exchange
            routingKey,   // routing key
            false,        // mandatory
            false,        // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        msg,
            },
        )
        
        failOnError(err, "Failed to publish a message")
        p.logger.Zap.Info("Message published: %s", message.Body)
        return nil
    }