api
2025年8月5日小于 1 分钟
python
confluent-kafka
from confluent_kafka.admin import AdminClient, NewTopic
# Kafka broker 地址
kafka_config = {'bootstrap.servers': '192.168.3.51:9092'}
# 创建 AdminClient 实例
admin_client = AdminClient(kafka_config)
# 要创建的 topic 名称
topic_name = 'my-new-topic'
# 构造 NewTopic 对象(1 分区,1 副本)
new_topic = NewTopic(topic=topic_name, num_partitions=1, replication_factor=1)
# 创建 Topic
fs = admin_client.create_topics([new_topic])
# 检查结果
for topic, f in fs.items():
    try:
        f.result()  # 阻塞直到完成
        print(f"✅ Topic '{topic}' created")
    except Exception as e:
        print(f"❌ Failed to create topic '{topic}': {e}")