Merhaba değerli takipçilerim,
ChatGPT’ten Python ve Kafka kullanarak örnek bir Microservice mimarisi hazırlamasını istedim. Ve cevaplara göre bu makaleyi hazırladım.

Öncelikle Producer, Consumer ve API Gateway sınıflarına ihtiyacımız var ve sonra bu Producer ve Consumer sınıflarını kullanarak mikroservislerimizi kodluyoruz.
Microservice Mimarisi:
- Producer
- Consumer
- API Gateway
- Microservice’ler
- Zookeeper
1. Producer
Cassandra veritabanından sürekli güncel veriyi alıp Kafkaya gönderen örnek bir Producer yazdırdım:
from cassandra.cluster import Cluster
from kafka import KafkaProducer
import json
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Create a keyspace and table
session.execute("""
CREATE KEYSPACE IF NOT EXISTS mykeyspace
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
""")
session.execute("""
CREATE TABLE IF NOT EXISTS mykeyspace.data (
id int PRIMARY KEY,
value text
)
""")
# Function to retrieve data from Cassandra
def get_data_from_cassandra():
rows = session.execute("SELECT * FROM mykeyspace.data")
data = []
for row in rows:
data.append({'id': row.id, 'value': row.value})
return data
# Function to send data to Kafka topic
def send_data_to_kafka(topic, data):
for item in data:
producer.send(topic, value=json.dumps(item))
producer.flush()
# Continuously poll for new data in Cassandra
while True:
data = get_data_from_cassandra()
send_data_to_kafka("example_topic", data)
2. Consumer
from kafka import KafkaConsumer, KafkaProducer
import json
# Create a Kafka consumer to subscribe to a specific topic
consumer = KafkaConsumer('example_topic', bootstrap_servers='localhost:9092')
# Create a Kafka producer to send messages to a specific topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Define a function that performs a specific task
def process_data(data):
# Perform some task on the data, such as transforming or analyzing it
result = data * 2
return result
# Continuously poll for new messages on the topic
for message in consumer:
# Deserialize the message
data = json.loads(message.value)
# Process the data and send the result to another topic
result = process_data(data)
producer.send('result_topic', value=json.dumps(result))
producer.flush()
3. API Gateway (FastAPI kullandım)
from fastapi import FastAPI, Request, HTTPException
from kafka import KafkaProducer, KafkaConsumer
import json
app = FastAPI()
producer = KafkaProducer(bootstrap_servers='localhost:9092')
@app.post("/data/{topic}")
async def send_data(request: Request, topic: str):
data = await request.json()
producer.send(topic, value=json.dumps(data))
producer.flush()
return {"status": "success"}
@app.get("/data/{topic}")
def receive_data(topic: str):
consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9092')
data = []
for message in consumer:
data.append(json.loads(message.value))
return data
@app.get("/topics")
def get_topics():
try:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
topics = consumer.topics()
return topics
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
4. Microservice
from kafka import KafkaConsumer, KafkaProducer
import json
# Create a Kafka consumer to subscribe to a specific topic
consumer = KafkaConsumer('example_topic', bootstrap_servers='localhost:9092')
# Create a Kafka producer to send messages to a specific topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Define a function that performs a specific task
def process_data(data):
# Perform some task on the data, such as transforming or analyzing it
result = data * 2
return result
# Continuously poll for new messages on the topic
for message in consumer:
# Deserialize the message
data = json.loads(message.value)
# Process the data and send the result to another topic
result = process_data(data)
producer.send('result_topic', value=json.dumps(result))
producer.flush()
5. ZooKeeper
Apache ZooKeeper, genellikle Apache Kafka ile birlikte kullanılan bir dağıtık koordinasyon hizmetidir. İşte Python’da bir ZooKeeper örneğini kurmak için bir örnek kod:
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
# create a znode
zk.create("/my_znode", b"my_data")
# check if znode exists
if zk.exists("/my_znode"):
print("Znode exists.")
# get data from znode
data, stat = zk.get("/my_znode")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# update data in znode
zk.set("/my_znode", b"new_data")
# delete a znode
zk.delete("/my_znode")
zk.stop()
Bu örnekte, kazoo
kütüphanesini kullanarak localhost’ta 2181 portunda çalışan bir ZooKeeper örneği ile etkileşim kuruyoruz. Kod yeni bir znode oluşturuyor ve adı “/my_znode” ile veri “my_data” ile, znode var mı diye kontrol ediyor, veriyi alıyor, güncelliyor ve znode’u siliyor. Lütfen unutmayın ki bu sadece bir örnektir, üretim ortamında kullanmadan önce uygun hata yönetimi ve testleri yapmanız gerekir, ayrıca host, port ve tüm parametrelerin kullanılan zookeeper kümesinin özelleştirilmiş yapılandırmasına göre ayarlanması gerekir.
Değerli takipçilerim, bu makale Python ve Kafka ile mikroservis mimarisine giriş seviyesinde olup Producer ve Consumer kodlarının TOPIC parametresini kullanacak şekilde sınıflara dönüştürülmesi gerektiği kanaatimdeyim. Bu görevi şimdilik sizlere bırakıyorum.
Mutlu kodlamalar 🙂