跳转到内容
View in the app

A better way to browse. Learn more.

彼岸论坛

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.
欢迎抵达彼岸 彼岸花开 此处谁在 -彼岸论坛

[问与答] kafka 单节点问题请教

发表于
佬们,请教个问题,我需要搭建一个单节点的 kafka 服务器,,根据官网的教程,我已经搭建好了,现在需要局域网的节点的能连 kafka 服务器,为了 kafka 服务器能局域网能访问,需要修改配置文件 ,修改内容如下

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.5.155:9092

这样操作后,在本机上,创建话题,再使用生产者,和消费者没有问题。但是我在另一台服务器上使用 Python3 脚本模拟生产者,消费者就有问题。
下面是生产者脚本
from kafka3 import KafkaProducer
import json

创建 Kafka Producer 实例
producer = KafkaProducer(
bootstrap_servers=‘192.168.5.155:9092’, # Kafka 服务器的地址和端口
value_serializer=lambda v: json.dumps(v).encode(‘utf-8’) # 将消息序列化为 JSON 格式
)

发送消息到 Kafka 的主题
producer.send(‘test-topic’, {‘key’: ‘value’}) # ‘my-topic’ 替换为你的实际主题名

确保所有消息都被发送
producer.flush()

print(“消息发送成功”)
下面是消费者的脚本
from kafka3 import KafkaConsumer
import json

创建 Kafka Consumer 实例
consumer = KafkaConsumer(
‘test-topic’, # 替换为你的实际主题名
bootstrap_servers=‘192.168.5.155:9092’, # Kafka 服务器的地址和端口
auto_offset_reset=‘earliest’, # 从最早的消息开始读取
enable_auto_commit=True, # 自动提交偏移量
group_id=‘my-group’, # 消费者组 ID
value_deserializer=lambda x: json.loads(x.decode(‘utf-8’)) # 将消息反序列化为 JSON 格式
)

消费消息
for message in consumer:
print(f"接收到消息: {message.value}")
运行脚本报错了,
Traceback (most recent call last):
File “/root/python_code/writing.py”, line 5, in
producer = KafkaProducer(
File “/usr/local/lib/python3.10/dist-packages/kafka3/producer/kafka.py”, line 383, in init
client = self.config[‘kafka_client’](
File “/usr/local/lib/python3.10/dist-packages/kafka3/client_async.py”, line 244, in init
self.config[‘api_version’] = self.check_version(timeout=check_timeout)
File “/usr/local/lib/python3.10/dist-packages/kafka3/client_async.py”, line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka3.errors.NoBrokersAvailable: NoBrokersAvailable

想问哈各位哪里不对,脚本是用 chatgpt4 生成的,我在 192.168.5.111 上 ping 192.168.5.155 是通的,我试了 telnet 192.168.5.155 9092 不通,但我已经 关了防火墙,我查看了又有端口监听在 9092.
感谢各位大佬给我指点。

Featured Replies

No posts to show

创建帐户或登录来提出意见

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.