We will be let KrakenD to be able to publish and consume messages from RabbitMQ.
sequenceDiagram
Client ->> API-KrakenD: Send a POST
API-KrakenD-->>RabbitMQ: Publishing the Payload
In the above case, KrakenD will provide an out-of-the-box backend to publish a message into the specified exchange in RabbitMQ.
Please, run the following docker command:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
The following result, will the a console log screen as the following ones:
Configuring logger redirection
2020-11-17 20:52:35.100 [debug] <0.287.0> Lager installed handler error_logger_lager_h into error_logger
2020-11-17 20:52:35.487 [info] <0.272.0>
Starting RabbitMQ 3.8.9 on Erlang 23.1.2
Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
Licensed under the MPL 2.0. Website: https://rabbitmq.com
## ## RabbitMQ 3.8.9
########## Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
###### ##
########## Licensed under the MPL 2.0. Website: https://rabbitmq.com
Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html
Logs: <stdout>
Config file(s): /etc/rabbitmq/rabbitmq.conf
Starting broker...2020-11-17 20:52:35.491 [info] <0.272.0>
node : rabbit@1d1aeaabfa13
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : dSjJ44QW4pEza/vazmLqQQ==
log(s) : <stdout>
database dir : /var/lib/rabbitmq/mnesia/rabbit@1d1aeaabfa13
The next screeshot is the RabbitMQ Management Console URL:
The url access is: http://127.0.0.1:15672 , where “guest” works for both user/password.
This post is based on the following documenation’s page: https://www.krakend.io/docs/backends/pubsub/, in that case we will use KrakenD to automatically define a Backend to publish a payload as a message, and we can get able to read it back (consuming).
If you backend that interact with you messaging layer demands too much complexity, business rules,etc, we do recommend you create a Backend to treat it in a better fashion, with no limitations and able to do whatever you need.
For this example, we will use the following krakend.json config file:
{
"version":2,
"port":8080,
"endpoints":[
{
"endpoint":"/produce/{key}",
"method":"POST",
"backend":[
{
"extra_config":{
"github.com/devopsfaith/krakend-amqp/produce":{
"exchange":"testdiuscoa",
"durable":true,
"delete":false,
"exclusive":false,
"no_wait":true,
"mandatory":true,
"immediate":false,
"name":"radiuscoa",
"routing_key":"Key"
}
},
"host":[
"amqp://guest:guest@localhost:5672"
],
"disable_host_sanitize":true
}
]
},
{
"endpoint":"/consume",
"backend":[
{
"extra_config":{
"github.com/devopsfaith/krakend-amqp/consume":{
"name":"queue-1",
"exchange":"testdiuscoa",
"durable":true,
"delete":false,
"exclusive":false,
"no_wait":true,
"no_local":false,
"routing_key":[
"#"
],
"prefetch_count":10
}
},
"host":[
"amqp://guest:guest@localhost:5672"
],
"disable_host_sanitize":true
}
]
}
]
}
Once you have KrakenD installed into your machine, we will use the krakend binary program, that can be installed from many different plataforms(https://www.krakend.io/docs/overview/installing/)
Please, create a folder named krakend-rabbit into your machine, copy and create a file called krakend.json into that created folder and type the following command:
krakend run -c krakend.json
The KrakenD API Gateway runtime will boot in over miliseconds, with the logs and informations, as the following ones:
krakend run -c krakend.json
Parsing configuration file: krakend.json
2020/11/18 03:05:59 ERROR: unable to create the gologging logger: getting the extra config for the krakend-gologging module
2020/11/18 03:05:59 ERROR: unable to create the GELF writer: getting the extra config for the krakend-gelf module
2020/11/18 03:05:59 INFO: Listening on port: 8080
2020/11/18 03:05:59 DEBUG: creating a new influxdb client
2020/11/18 03:05:59 DEBUG: no config for the influxdb client. Aborting
2020/11/18 03:05:59 WARNING: influxdb: unable to load custom config
2020/11/18 03:05:59 WARNING: opencensus: no extra config defined for the opencensus module
2020/11/18 03:05:59 WARNING: building the etcd client: unable to create the etcd client: no config
2020/11/18 03:05:59 DEBUG: no config for the bloomfilter
2020/11/18 03:05:59 WARNING: bloomFilter: no config for the bloomfilter
2020/11/18 03:05:59 WARNING: no config present for the httpsecure module
2020/11/18 03:05:59 INFO: registering usage stats for cluster ID 'F8ai6U9zv0qNuwRJrbglzP0A7to7xFVpOn9ujfccbfM='
2020/11/18 03:05:59 DEBUG: lua: no extra config
2020/11/18 03:05:59 DEBUG: botdetector middleware: no config defined for the module
2020/11/18 03:05:59 DEBUG: AMQP: amqp://guest:guest@localhost:5672: no amqp consumer defined
2020/11/18 03:05:59 DEBUG: CEL: no extra config detected for backend /
2020/11/18 03:05:59 DEBUG: lua: no extra config
2020/11/18 03:05:59 DEBUG: CEL: no extra config detected for pipe /produce/:key
2020/11/18 03:05:59 DEBUG: lua: no extra config
2020/11/18 03:05:59 INFO: JOSE: singer disabled for the endpoint /produce/:key
2020/11/18 03:05:59 DEBUG: lua: no extra config
2020/11/18 03:05:59 INFO: JOSE: validator disabled for the endpoint /produce/:key
2020/11/18 03:05:59 DEBUG: botdetector: no config defined for the module
2020/11/18 03:06:00 DEBUG: CEL: no extra config detected for backend /
2020/11/18 03:06:00 DEBUG: lua: no extra config
2020/11/18 03:06:00 DEBUG: CEL: no extra config detected for pipe /consume
2020/11/18 03:06:00 DEBUG: lua: no extra config
2020/11/18 03:06:00 INFO: JOSE: singer disabled for the endpoint /consume
2020/11/18 03:06:00 DEBUG: lua: no extra config
2020/11/18 03:06:00 INFO: JOSE: validator disabled for the endpoint /consume
2020/11/18 03:06:00 DEBUG: botdetector: no config defined for the module
2020/11/18 03:06:00 DEBUG: http-server-handler: no extra config
![Homer Happy](https://media1.giphy.com/media/xT5LMHxhOfscxPfIfm/200.gif)
Now we can test the endpoints exposed by KrakenD that can publish the messages:
curl -i --location --request POST 'http://localhost:8080/produce/foo' --data-raw '{"foo":"bar"}'
HTTP/1.1 200 OK
**Content-Type**: application/json; charset=utf-8
**X-Krakend**: Version 1.1.1
**X-Krakend-Completed**: false
**Date**: Wed, 18 Nov 2020 06:12:28 GMT
**Content-Length**: 5
null
In that resource, there is no message back.
Now let’s consume the message from RabbitMQ:
curl -i --location --request GET 'http://localhost:8080/consume'
HTTP/1.1 200 OK
**Content-Type**: application/json; charset=utf-8
**X-Krakend**: Version 1.1.1
**X-Krakend-Completed**: true
**Date**: Wed, 18 Nov 2020 06:15:53 GMT
**Content-Length**: 14
{"foo":"bar"}
If you get any error or not working as demonstrated, please, add the following Environment variable: export RABBIT_SERVER_URL='guest:guest@localhost:5672'
This document demonstrates how you can integrate KrakenD with RabbitMQ. For more information, please reach us at: hello@skalena.com
RabbitMQ Publisher written in Go Lang (sender.go):
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
Here, you can see the receiver.go :
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}