Python ve Kafka Kullanan Microservice Mimarisi


Merhaba değerli takipçilerim,

ChatGPT’ten Python ve Kafka kullanarak örnek bir Microservice mimarisi hazırlamasını istedim. Ve cevaplara göre bu makaleyi hazırladım.

Öncelikle Producer, Consumer ve API Gateway sınıflarına ihtiyacımız var ve sonra bu Producer ve Consumer sınıflarını kullanarak mikroservislerimizi kodluyoruz.

Microservice Mimarisi:

  1. Producer
  2. Consumer
  3. API Gateway
  4. Microservice’ler
  5. Zookeeper

1. Producer

Cassandra veritabanından sürekli güncel veriyi alıp Kafkaya gönderen örnek bir Producer yazdırdım:

from cassandra.cluster import Cluster
from kafka import KafkaProducer
import json

cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Create a keyspace and table
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS mykeyspace
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
""")
session.execute("""
    CREATE TABLE IF NOT EXISTS mykeyspace.data (
        id int PRIMARY KEY,
        value text
    )
""")

# Function to retrieve data from Cassandra
def get_data_from_cassandra():
    rows = session.execute("SELECT * FROM mykeyspace.data")
    data = []
    for row in rows:
        data.append({'id': row.id, 'value': row.value})
    return data

# Function to send data to Kafka topic
def send_data_to_kafka(topic, data):
    for item in data:
        producer.send(topic, value=json.dumps(item))
    producer.flush()

# Continuously poll for new data in Cassandra
while True:
    data = get_data_from_cassandra()
    send_data_to_kafka("example_topic", data)

2. Consumer

from kafka import KafkaConsumer, KafkaProducer
import json

# Create a Kafka consumer to subscribe to a specific topic
consumer = KafkaConsumer('example_topic', bootstrap_servers='localhost:9092')

# Create a Kafka producer to send messages to a specific topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Define a function that performs a specific task
def process_data(data):
    # Perform some task on the data, such as transforming or analyzing it
    result = data * 2
    return result

# Continuously poll for new messages on the topic
for message in consumer:
    # Deserialize the message
    data = json.loads(message.value)
    # Process the data and send the result to another topic
    result = process_data(data)
    producer.send('result_topic', value=json.dumps(result))
    producer.flush()

3. API Gateway (FastAPI kullandım)

from fastapi import FastAPI, Request, HTTPException
from kafka import KafkaProducer, KafkaConsumer
import json

app = FastAPI()
producer = KafkaProducer(bootstrap_servers='localhost:9092')

@app.post("/data/{topic}")
async def send_data(request: Request, topic: str):
    data = await request.json()
    producer.send(topic, value=json.dumps(data))
    producer.flush()
    return {"status": "success"}

@app.get("/data/{topic}")
def receive_data(topic: str):
    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9092')
    data = []
    for message in consumer:
        data.append(json.loads(message.value))
    return data

@app.get("/topics")
def get_topics():
    try:
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
        topics = consumer.topics()
        return topics
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

4. Microservice

from kafka import KafkaConsumer, KafkaProducer
import json

# Create a Kafka consumer to subscribe to a specific topic
consumer = KafkaConsumer('example_topic', bootstrap_servers='localhost:9092')

# Create a Kafka producer to send messages to a specific topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Define a function that performs a specific task
def process_data(data):
    # Perform some task on the data, such as transforming or analyzing it
    result = data * 2
    return result

# Continuously poll for new messages on the topic
for message in consumer:
    # Deserialize the message
    data = json.loads(message.value)
    # Process the data and send the result to another topic
    result = process_data(data)
    producer.send('result_topic', value=json.dumps(result))
    producer.flush()

5. ZooKeeper

Apache ZooKeeper, genellikle Apache Kafka ile birlikte kullanılan bir dağıtık koordinasyon hizmetidir. İşte Python’da bir ZooKeeper örneğini kurmak için bir örnek kod:

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

# create a znode
zk.create("/my_znode", b"my_data")

# check if znode exists
if zk.exists("/my_znode"):
    print("Znode exists.")

# get data from znode
data, stat = zk.get("/my_znode")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

# update data in znode
zk.set("/my_znode", b"new_data")

# delete a znode
zk.delete("/my_znode")

zk.stop()

Bu örnekte, kazoo kütüphanesini kullanarak localhost’ta 2181 portunda çalışan bir ZooKeeper örneği ile etkileşim kuruyoruz. Kod yeni bir znode oluşturuyor ve adı “/my_znode” ile veri “my_data” ile, znode var mı diye kontrol ediyor, veriyi alıyor, güncelliyor ve znode’u siliyor. Lütfen unutmayın ki bu sadece bir örnektir, üretim ortamında kullanmadan önce uygun hata yönetimi ve testleri yapmanız gerekir, ayrıca host, port ve tüm parametrelerin kullanılan zookeeper kümesinin özelleştirilmiş yapılandırmasına göre ayarlanması gerekir.

Değerli takipçilerim, bu makale Python ve Kafka ile mikroservis mimarisine giriş seviyesinde olup Producer ve Consumer kodlarının TOPIC parametresini kullanacak şekilde sınıflara dönüştürülmesi gerektiği kanaatimdeyim. Bu görevi şimdilik sizlere bırakıyorum.

Mutlu kodlamalar 🙂

Reklam

Bir Cevap Yazın

Aşağıya bilgilerinizi girin veya oturum açmak için bir simgeye tıklayın:

WordPress.com Logosu

WordPress.com hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Facebook fotoğrafı

Facebook hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Connecting to %s