推送数据到MQ消息队列

推送CSV数据至RabbitMQ队列,验证消费端的程序是否正常
Python代码:

import csv
import logging
import sys

import pika
from pika.exceptions import AMQPConnectionError

# 设置日志级别为DEBUG
logging.basicConfig(level=logging.ERROR)

# RabbitMQ连接信息
RABBITMQ_HOST = '192.168.xxx.xxx'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'guest'
RABBITMQ_PASSWORD = 'guest'
RABBITMQ_QUEUE = 'abnormalExchange.xxxxxx'
RABBITMQ_EXCHANGE = 'abnormalExchange'

# 读取CSV文件的路径
CSV_FILE_PATH = 'mq_log_202404191532.csv'

class RabbitMQManager:
    def __init__(self):
        self.connection = None
        self.channel = None

    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=RABBITMQ_HOST,
                port=RABBITMQ_PORT,
                credentials=pika.PlainCredentials(
                    username=RABBITMQ_USERNAME,
                    password=RABBITMQ_PASSWORD
                )
            )
        )
        self.channel = self.connection.channel()
        self.channel.queue_bind(exchange='abnormalExchange', queue=RABBITMQ_QUEUE)

    def send_message(self, message):
        try:
            self.channel.basic_publish(
                exchange=RABBITMQ_EXCHANGE,
                routing_key=RABBITMQ_QUEUE,
                body=message
            )
            logging.info(f"Sent message: {message}")
        except AMQPConnectionError as e:
            logging.error(f"Error sending message: {e}")

    def close_connection(self):
        if self.connection and not self.connection.is_closed:
            self.connection.close()

def read_csv_data(csv_file, start, count):
    # 设置CSV模块的字段大小限制为最大整数值
    csv.field_size_limit(sys.maxsize)
    data = []
    with open(csv_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file)
        for i, row in enumerate(reader):
            if i < start:
                continue
            if i >= start + count:
                break
            # 替换两个双引号为一个双引号
            #row = [cell.strip('"') for cell in row]
            row = [cell.replace('companyName', 'factory') for cell in row]
            row = [cell.replace('""', '"') for cell in row]
            data.append(row)
    return data

def process_data(row):
    # 这里可以根据实际需求处理数据,这里只是简单示例
    return ', '.join(row)

def main():
    start = 300000  # 起始行
    count = 1300000  # 读取的行数
    data = read_csv_data(CSV_FILE_PATH, start, count)
    rabbitmq_manager = RabbitMQManager()
    rabbitmq_manager.connect()

    for row in data:
        processed_data = process_data(row)
        # print(processed_data.strip('"').strip('"'))
        # exit()
        rabbitmq_manager.send_message(processed_data.strip('"').strip('"'))

    rabbitmq_manager.close_connection()

if __name__ == "__main__":
    main()