推送数据到MQ消息队列
- RabbitMQ
- 2024-04-22
- 1173热度
- 0评论
推送CSV数据至RabbitMQ队列,验证消费端的程序是否正常
Python代码:
import csv
import logging
import sys
import pika
from pika.exceptions import AMQPConnectionError
# 设置日志级别为DEBUG
logging.basicConfig(level=logging.ERROR)
# RabbitMQ连接信息
RABBITMQ_HOST = '192.168.xxx.xxx'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'guest'
RABBITMQ_PASSWORD = 'guest'
RABBITMQ_QUEUE = 'abnormalExchange.xxxxxx'
RABBITMQ_EXCHANGE = 'abnormalExchange'
# 读取CSV文件的路径
CSV_FILE_PATH = 'mq_log_202404191532.csv'
class RabbitMQManager:
def __init__(self):
self.connection = None
self.channel = None
def connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=pika.PlainCredentials(
username=RABBITMQ_USERNAME,
password=RABBITMQ_PASSWORD
)
)
)
self.channel = self.connection.channel()
self.channel.queue_bind(exchange='abnormalExchange', queue=RABBITMQ_QUEUE)
def send_message(self, message):
try:
self.channel.basic_publish(
exchange=RABBITMQ_EXCHANGE,
routing_key=RABBITMQ_QUEUE,
body=message
)
logging.info(f"Sent message: {message}")
except AMQPConnectionError as e:
logging.error(f"Error sending message: {e}")
def close_connection(self):
if self.connection and not self.connection.is_closed:
self.connection.close()
def read_csv_data(csv_file, start, count):
# 设置CSV模块的字段大小限制为最大整数值
csv.field_size_limit(sys.maxsize)
data = []
with open(csv_file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
for i, row in enumerate(reader):
if i < start:
continue
if i >= start + count:
break
# 替换两个双引号为一个双引号
#row = [cell.strip('"') for cell in row]
row = [cell.replace('companyName', 'factory') for cell in row]
row = [cell.replace('""', '"') for cell in row]
data.append(row)
return data
def process_data(row):
# 这里可以根据实际需求处理数据,这里只是简单示例
return ', '.join(row)
def main():
start = 300000 # 起始行
count = 1300000 # 读取的行数
data = read_csv_data(CSV_FILE_PATH, start, count)
rabbitmq_manager = RabbitMQManager()
rabbitmq_manager.connect()
for row in data:
processed_data = process_data(row)
# print(processed_data.strip('"').strip('"'))
# exit()
rabbitmq_manager.send_message(processed_data.strip('"').strip('"'))
rabbitmq_manager.close_connection()
if __name__ == "__main__":
main()