RabbitMQ module

The RabbitMQ module provides integration with RabbitMQ. It uses the fs2-rabbit library.

RabbitMQ Module Configuration

The RabbitMQ configuration is defined in the Config case class.

It includes the following fields:

  • host: the RabbitMQ host

  • port: the RabbitMQ port

  • virtualHost: the RabbitMQ virtual host

  • connectionTimeout: the connection timeout

  • ssl: enable SSL mode

  • username: RabbitMQ username

  • password: RabbitMQ password

  • requeueOnNack: requeue messages when not ACK-ed

  • requeueOnReject: requeue messages when rejected

  • internalQueueSize: client internal queue size

  • requestedHeartbeat: heartbeat interval

  • automaticRecovery: automatically reconnect on failure

  • clientProvidedConnectionName: client label

The configuration is read from the application’s configuration file under the rabbitmq section.

Using the RabbitMQ Module

To use the RabbitMQ module, you need to import it and then access it through the Pillars instance:

import pillars.redis.*

val rabbitmqModule = pillarsInstance.redis

You can also use directly RabbitMQ[F]. You can then use the rabbitmqModule to perform RabbitMQ operations.

RabbitMQ Operations

import pillars.redis.*

for
    client <- RabbitMQ[IO](configFor(container)).map(_.client)
    _      <- client.createConnectionChannel.evalMap { implicit channel =>
                  for
                      publisher  <- client.createPublisher[String](exchange, routingKey)
                      _          <- publisher("test message")
                      subscriber <- client.createAutoAckConsumer[String](queue)
                      out        <- subscriber.head.compile.onlyOrError
                  yield assertEquals(out.payload, "test message")
              }
yield ()
end for