Abel'Blog

我干了什么?究竟拿了时间换了什么?

0%

kafka笔记

简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。百度百科

kafka的架构师jay kreps对于kafka的名称由来是这样讲的,由于jay kreps非常喜欢franz kafka,并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。franz kafka就是写《变形记》的那位犹太作家。

环境部署

使用docker安装

kafka需要使用zookeeper来做集群选举相关逻辑。我们使用docker快速部署这两个软件。

1
2
3
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<yours_ip_addr>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<yours_ip_addr>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

查看工具

工具都是java编写的,能直接跨平台运行。

zookeeper查看工具 zooinspector

offsetexplorer-kafka查看器

概念

kafka里面有通过 zookeeper 来制作的选举,服务发现类似的功能。

可以在kafka里面创建一个topic,然后在里面的partition里面写入自己的信息。kafka将会在写入信息的时候,寻找一个broker的leader,写入。集群系统将会把这些信息在同步到各个机器上。

通过offsetexplorer-kafka工具能看到在kafka里面写入的数据。

实践

这里我们使用github里面最受欢迎的kafka go语言版本的库—- sarama 编写一个生产者消费者的例子。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/Shopify/sarama"
)

var (
ctx context.Context
ctxFun context.CancelFunc
mq chan string
wg sync.WaitGroup
)

const topic string = "test-topic"

func main() {

ctx, ctxFun = context.WithCancel(context.Background())
mq = make(chan string, 10)
addr := []string{"127.0.0.1:9092"}

config := sarama.NewConfig()
// 最好能对齐kafka的版本
config.Version = sarama.V2_2_0_0

if client, err := sarama.NewClient(addr, config); err != nil {
fmt.Println("initProducer NewClient error ", err.Error())
return
} else {
defer client.Close()
if producer, err := sarama.NewAsyncProducerFromClient(client); err != nil {
fmt.Println("initProducer NewAsyncProducerFromClient error ", err.Error())
return
} else {

defer producer.Close()
go sendLoop(producer)
fmt.Println("initProducer success")
}
}

tick := time.NewTicker(time.Second * 2)
go func() {
for {
select {
case <-tick.C:
mq <- "test message"
case <-ctx.Done():
return
}
}
}()

c := make(chan os.Signal, 1)
<-c
}

func sendLoop(producer sarama.AsyncProducer) {
wg.Add(1)
defer wg.Done()

for {
select {
case msg, ok := <-mq:
if !ok {
return
}
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Partition: 0, Key: nil, Value: sarama.StringEncoder(msg)}
case err, ok := <-producer.Errors():
if ok {
fmt.Println("sendLoop send messages fail error ", err.Error())
}
case <-ctx.Done():
fmt.Println("sendLoop exit ")
return
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main

import (
"context"
"fmt"
"os"
"sync"

"github.com/Shopify/sarama"
)

var (
ctx context.Context
ctxFun context.CancelFunc
mq chan string
wg sync.WaitGroup
)

const topic string = "test-topic"

func main() {

ctx, ctxFun = context.WithCancel(context.Background())
mq = make(chan string, 10)
addr := []string{"127.0.0.1:9092"}

config := sarama.NewConfig()
// 默认的NewConfig中的收包的处理事件大概是100-500ms MaxWaitTime ,如果

// 最好能对齐kafka的版本
config.Version = sarama.V2_2_0_0

if client, err := sarama.NewClient(addr, config); err != nil {
fmt.Println("initProducer NewClient error ", err.Error())
return
} else {
defer client.Close()

if consumer, err := sarama.NewConsumerFromClient(client); err != nil {
fmt.Println("subscribeTopic NewConsumerFromClient fail err ", err.Error(), " addr ", addr)
return
} else {

if partition, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest); err != nil {
consumer.Close()
fmt.Println("subscribeTopic ConsumePartition fail ", err.Error(), " addr ", addr)
return
} else {
go consumeLoop(partition)
fmt.Println("subscribeTopic ConsumePartition success addr ", addr)
}
}
}

c := make(chan os.Signal, 1)
<-c

ctxFun()
wg.Wait()
}

func consumeLoop(partition sarama.PartitionConsumer) {
wg.Add(1)
defer wg.Done()

for {
select {
case msg, ok := <-partition.Messages():
if !ok {
fmt.Println("consumeLoop read messages fail Partition.Messages error addr ")
return
}
strMsg := string(msg.Value)
fmt.Println("consumeLoop value ", strMsg)
case err, ok := <-partition.Errors():
if ok {
fmt.Println("consumeLoop read messages fail error ", err.Error())
}
case <-ctx.Done():
fmt.Println("consumeLoop exit")
return
}
}
}

func CreateTopic(client sarama.Client,topicname string) {
bkmd, _ := client.Controller()
req := new(sarama.CreateTopicsRequest)
req.Timeout = time.Second * 15
req.Version = 1
req.TopicDetails = make(map[string]*sarama.TopicDetail)
topicmd := new(sarama.TopicDetail)
topicmd.NumPartitions = 1
topicmd.ReplicationFactor = 1
req.TopicDetails[topicname] = topicmd
if res, err := bkmd.CreateTopics(req); err != nil {
_ = err
_ = res
}
}

2021-10-18 13:41:33.673 ERROR Sarama.go initProducer NewClient error kafka: client has run out of available brokers to talk to (Is your cluster reachable?) 2021-10-18 13:41:33.676 ERROR Sarama.go Init fail initProducer() err kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

引用