【Kafka教程】(1)Kafka基本概念与集群配置详细教程
一、Kafka介绍
Kafka是Apache开发的一款为大数据而生的分布式消息中间件,支持百亿级吞吐量。消息中间件的出现让生产者和消费者不直接进行通信,降低了传统消息传输模式下的耦合性。通过N个生产者进行生产并将数据放在消息队列中,再由后端N个消费者进行处理,解决了生产速度和消费速度不匹配的问题,还可以避免生产者或者消费者任意一方出现问题时发生数据丢失的情况。如果消息队列满了可以考虑对消息队列服务进行扩容。为了避免Kafka数据丢失或者数据重复带来的数据不一致性问题,需要服务端、客户端都做好相应配置以及采取一些补偿方案:
· 生产端:要求不能少生产消息。比如使用带回调方法的API来确认消息发送是否成功、生产消息有重试机制等
· 服务端:要求不能丢失消息。比如对副本做好相关配置
· 消费端:要求不能少消费消息
二、Kafka相关名词解释
· broker:kafka集群由多个节点构成,每个节点上的实例就叫做broker,broker又分为了Leader(主)和Follower(从)
· producer:生产者,用于产生数据到Kafka,还决定了消息的属性,比如partition和replicas的数量(kafka配置文件中虽然也可以指定这些属性,但只是默认值)
· consumer:消费者,用于处理生产出的数据并将消费进度(offset)保存在Kafka的topic中
· consumer Group:每个consumer都属于一个特定的Group(如果没配置Group name的话则是默认的Group),将多个消费者集中到一个组中可以提升数据消费能力
· topic:主题,类似表名,可以作为数据分类隔离的作用
· partition:每个topic可以有多个partition分区,每个partition分为Leader(主)和Follower(从),当Leader失效后会从Follower中选举,用于保证服务的高可用。合理的分区可以提高Kafka集群的负载能力,实现多partition写入,但是在需要严格保证消息消费顺序的场景下,需要将partition设置为1
· replicas:副本,每个partition可以配置多个replicas作为备份。在这些副本中会有一个leader角色负责数据的读写操作,而其他副本则属于follower,仅和leader保持同步而不提供其他服务。当leader故障时,Kafka根据ISR(将所有follower加入到ISR中,如果有follower迟迟不能和leader保持同步则将其踢出)选取一个follower提升为leader。这些副本之间存在一个同样的高水位(High Watermark),高水位的作用就是让消费者只能看到水位线之前的数据,这样可以确保角色切换后对于消费者来说数据是一致的,但是不能保证数据的完整或者不重复。数据的丢失或者重复问题由ACK机制负责,ACK级别分了0、1、-1三种,0、1均有丢失数据的问题,-1代表leader和follower全部落盘成功后才返回ack,数据不会丢失,但可能产生重复数据。
· offset:偏移量,用于标识下次应该读取的消息的位置
· zookeeper:Kafka所依赖的注册中心服务,由于kafka支持集群化,当leader和follower不在一个节点中就需要将leader的信息注册在Zookeeper中。对于Kafka而言,它将每个节点的metadata信息告诉zk,再由zk来配置和存储节点与主题队列信息。zk和kafka的topic有一样的leader\follower角色
三、搭建Zookeeper+Kafka集群
1、由于Kafka是一个分布式系统,它依赖ZooKeeper来完成协调任务、状态与配置管理,所以在运行Kafka之前要先安装并启动ZK,具体可参考本站文章——《ZooKeeper集群配置教程》
2、通过官方网站kafka.apache.org/downloads下载Kafka二进制安装包,解压即可使用
tar zxf kafka_2.12-3.0.0.tgz mv kafka_2.12-3.0.0 /usr/local/ ln -s /usr/local/kafka_2.12-3.0.0 /usr/local/kafka
3、修改Kafka配置文件
#通常只需要修改server.properties这个文件,虽然还有生产者、消费者的属性文件,但是一般都是由程序去控制 vi config/server.properties broker.id=1 #集群中每个节点ID不能相同 log.dirs=/data/kafka/log #Kafka消息存放路径,并不是日志,kafka的日志会自动生成到其它目录中 zookeeper.connect=192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 #zk服务地址与端口 listeners=PLAINTEST://192.168.1.100:9092 #kafka监听端口 #优化配置 log.flush.interval.message=10000 #日志从内存刷到磁盘的阈值,达到这个数量就落盘,通常要减少落盘的频率 log.retention.hours=168 #日志落盘后保留多少小时,超时则删除,即便没有消费,避免磁盘写满 log.segment.bytes=1073741824 #单个日志的大小,超出则创建新的日志,删掉旧的日志 log.retention.check.interval.ms=300000 #检测日志是否达到删除条件的周期
4、使用kafka-server-start.sh脚本后台启动kafka
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & # bin/kafka-server-start.sh -daemon config/server.properties #另一种后台启动方式
5、登录ZooKeeper后可以看到kfaka的信息已经注册进来
./zkCli.sh
ls /
四、创建topic,完成数据的生产与消费
1、使用kafka-topics.sh脚本完成topic的创建、查看与删除,由于topic名称冲突限制,topic名称不能同时包含.下划线_
#创建topic /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 172.20.1.171:9092 --replication-factor 1 --partitions 1 --topic test_topic#--zookeeper:如果ZK是集群的话,随便写一个地址都可以 #--replication-factor:topic的总副本数,1代表主备节点总共只有一个副本,相当于没有备份,所以通常3节点的话设置为2 #--partitions:topic分区的数量,通常和节点数保持一致 #--topic:队列名字 #查看topic /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 172.20.1.171:9092 #查看topic详细信息,包含分区、副本与Leader情况 kafka-topics.sh --describe --topic test --zookeeper 192.168.1.100:2181 #删除topic kafka-topics.sh --zookeeper 172.20.0.155:2181,172.20.0.156:2181,172.20.0.157:2181 --delete --topic test_topic
2、在2个终端下分别使用下面的脚本模拟数据生产与消费过程
kafka-console-producer.sh --broker-list localhost:9092 --topic hello #在hello这个队列中开始生产数据
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning #从队列头部开始消费数据
3、登录Zookeeper验证,任意一台ZK节点应该都可以看到topic的信息。不过这里看到的只是信息而已,数据其实还是在Kfaka中。先看看ZK中的数据是怎么样的:
./zkCli.sh
ls /brokers/topic/ #路径根据实际情况做调整
4、在Kfaka配置文件中log.dirs所指定的目录中,存放了topic数据目录,在创建topic时配置了几个分区就会生成几份目录(如果创建topic时--partitions设定的是1,而Kfaka又有5个集群的话,那数据只会在一个节点上看到,ZK这个时候就发挥出作用了,它能告知用户数据在哪台上)。每个目录中的.log文件保存有队列中的消息,生产者生产的消息都会追加到log文件的尾部。而.index文件则记录了数据偏移量,根据这个快速查询到log中的数据。
5、推荐使用开源工具kafka-eagle,通过图形化界面方便的查看和管理Kafka集群
五、Kafka的扩容配置,需要对已有topic增加partition和replicas
1、首先要明白扩容的话意味着需要增加Kafka节点,所以第一件事就是在新节点上部署好Kafka,配置要和之前节点一致
2、登录ZK节点然后查看ids信息,如果发现新节点代表Kafka扩容是成功的
3、列出所有topic,找到需要扩容的topic,假设这里的topic是test
./kafka-topics.sh --list --zookeeper 192.168.1.101:2181
./kafka-topics.sh --describe --zookeeper 192.168.1.101:2181/kafka --topic test #可以看到test的partition和replicase等信息
4、 对partition进行扩容,扩容后重新查看信息可以看到变化,但是这个时候replicas依然只有一份
./kafka-topics.sh --zookeeper 192.168.1.101:2181/kafka --alter --topic test --partitions 3 #扩充到3个partitions
5、replicas的扩容需要先生成json文件,然后再用命令去读取该文件。需要注意如果该文件以前给其他topic使用过,需要重新生成一份而不能直接在上面修改,否则会失败
vi test.json
{
"partitions": [
{
"topic": "test", #需要扩容的topic
"partition": 0, #需要增加replicas的partitons
"replicas": [ #replicas需要增加在哪个broker.id上
34,
35
]
},
{
"topic": "test",
"partition": 1,
"replicas": [
34,
35,
36
]
}
]
}
6、通过json文件扩容Replicas
./kafka-reassign-partitions.sh --zookeeper 192.168.1.101:2181/kafka --reassignment-json-file test.json --execute
7、最后查看topic信息
./kafka-topics.sh --describe --zookeeper 192.168.1.101:2181/kafka --topic test
评论
爱乐
回复博主写的不错
TangLu
回复@爱乐 @爱乐:谢谢支持 一起学习