layered architercutre(3 or 4 tier)

image.png

import psycopg2

class UserService:
    def find_user_by_email(self, email: str):
        conn = psycopg2.connect(database="mydb", user="admin")
        cursor = conn.cursor()
        cursor.execute("SELECT id, email FROM users WHERE email = %s", (email,))
        result = cursor.fetchone()
        return {"id": result[0], "email": result[1]}
# interfaces.py
from abc import ABC, abstractmethod

class UserRepository(ABC):
    @abstractmethod
    def find_by_email(self, email: str) -> dict:
        pass

class EventPublisher(ABC):
    """이벤트 발행자 추상 인터페이스"""
    @abstractmethod
    def publish(self, event: Event) -> None:
        pass

# postgres_user_repository.py
import psycopg2
from interfaces import UserRepository

class PostgresUserRepository(UserRepository):
    def find_by_email(self, email: str) -> dict:
        conn = psycopg2.connect(database="mydb", user="admin")
        cursor = conn.cursor()
        cursor.execute("SELECT id, email FROM users WHERE email = %s", (email,))
        result = cursor.fetchone()
        return {"id": result[0], "email": result[1]}

# mysql_user_repository.py
import mysql_opg
from interfaces import UserRepository

class MySqlUserRepository(UserRepository):
    def find_by_email(self, email: str) -> dict:
        conn = mysql_opg.connect(database="mydb", user="admin")
        cursor = conn.cursor()
        cursor.execute("SELECT id, email FROM users WHERE email = %s", (email,))
        result = cursor.fetchone()
        return {"id": result[0], "email": result[1]}

# kafka_event_publisher.py
from event_interfaces import EventPublisher, Event

class KafkaEventPublisher(EventPublisher):
    """Kafka를 사용한 이벤트 발행 구현체"""
    def publish(self, event: Event) -> None:
		    pass

# rabbitmq_event_publisher.py
from event_interfaces import EventPublisher, Event

class RabbitMQEventPublisher(EventPublisher):    
    def publish(self, event: Event) -> None:
        ...

# user_service.py
class UserService:
    def __init__(self, repo: UserRepository, event_publisher: EventPublisher): # 런타임 의존성 발생
        self.repo = repo
        self.event_publisher = event_publisher

    def find_user_by_email(self, email: str):
		    self.event_publisher.publish(event)
        return self.repo.find_by_email(email)

클린 아키텍처

image.png