Sangil's blog

https://github.com/ChoiSangIl Admin

nodeJs rabbitMQ 연동(메세지 송수신) DEV / SERVER

2020-02-04 posted by sang12


nodeJs에서 rabbitMQ와 연동하여 메세지를 전달하고 전달받는 방법을 알아 보겠습니다. 아래는 소스와 설명입니다.

-send.js

var amqp = require('amqplib/callback_api');

//amqp://admin:admin@localhost admin:admin = rabbitmq 계정:암호
amqp.connect('amqp://admin:admin@localhost', function(error0, connection) {
    if (error0) {
        throw error0;
    }

    connection.createChannel(function(error1, channel) {
        if (error1) {
            throw error1;
        }

        //queue name
        var queue = 'test';

        /*
        * queue가 없으면 만들어줌
        * durable : true -> queue 데이터를  rabbitmq가 재시작해도 가지고 있음(소비하기전까지)
        */
        channel.assertQueue(queue, {
            durable: true
        });
        setInterval(sendToQueue, 1000, channel, queue)
    });

    setTimeout(function() {
        connection.close();
        process.exit(0);
    }, 50000);
});

function sendToQueue(channel, queue){
  var msg = 'Hello World! transDate:' + new Date();
  channel.sendToQueue(queue, Buffer.from(msg));
  console.log(" [x] Sent %s", msg);
}

1. npm install ampqlib으로 ampqlib를 받아옵니다.
2. amqp.connect로 ampq와의 커넥션을 생성합니다.
3. 커넥션이 연결됬으면 createChannel을 이용하여 rabbitMq와 채널을 생성합니다.
4. 접근할 queue명을 설정하고 assertQueue명령어를 통해 큐가 존재하지 않으면 해당 큐를 만들어줍니다. durable을 true로 설정하면 rabiitMQ가 다운되거나 종료되어도 큐에 데이터를 유지합니다.(파일에 씀)
5. setInterval 함수를 이용하여 1초마다 큐에 Hello World와 시간을 전송합니다.
6. 타임아웃 시간이 지나면 커넥션을 닫고 프로그램을 종료합니다.

-receive.js

var amqp = require('amqplib/callback_api');

//amqp://admin:admin@localhost admin:admin = rabbitmq 계정:암호
amqp.connect('amqp://admin:admin@localhost', function(error0, connection) {
        if (error0) {
    	     throw error0;
        }

	connection.createChannel(function(error1, channel) {
	    if (error1) {
                throw error1;
            }

	//queue name
        var queue = 'test';

        /*
        * queue가 없으면 만들어줌
        * durable : true -> queue 데이터를  rabbitmq가 재시작해도 가지고 있음(소비하기전까지)
        */
        channel.assertQueue(queue, {
            durable: true,
        });

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);

	//prefetch를 설정해두면 큐에서 최대 10개만 가져감.
        channel.prefetch(10);
        channel.consume(queue, function(msg) {
		console.log(" [x] Received %s", msg.content.toString());
		//Ack 메세지를 보내야 큐에서 제거함
		channel.ack(msg);
		//channel.nack(msg);
        }, {
            //noAck: true 이면 queue에서 데이터를 가져간다음 Ack를 바로 반환함으로 가져가자마자 queue에서 지워버림, ack를 받았을 경우만 큐에서 제거하기 위해 false로 설정
            noAck: false
        });
    });
});

-보내는부분과 중복되는 부분은 생략
1. prefetch를 설정하여 큐에서 가져올 최대 수를 설정합니다.
2. channel.consume을 이용하여 queue에서 데이터를 받아옵니다. 데이터를 수신시 해당 콜백 메서드가 호출됩니다.
3. queue에서 가져온 문자를 찍고 ack 메세지를 전달합니다. noAck: true이면 ack수신여부와 상관없이 queue에서 데이터를 삭제합니다. noAck true라면 rabiitMQ는 ack메세지를 받아야 해당 메세지가 정상적으로 보내졌다는 것을 확인하고 큐에서 제거합니다.

#node js rabbitMQ연동 #node js rabbitMQ #rabbitMQ nodejs연동
REPLY