本站所有文章均为原创,如对您有帮助,恳请帮忙点击任何一处广告
  • 首页
  • 大数据
  • 大数据组件之Kafka基本概念与集群配置详细教程

大数据组件之Kafka基本概念与集群配置详细教程

发布:TangLu2020-5-17 16:18分类: 大数据 标签: kafka

一、Kafka介绍

Kafka是Apache旗下的一款分布式消息中间件。消息中间件可以将生产者生产出的数据放在一个消息队列中,等待后端消费者来消费处理。除了处理速度比数据库快的优势外,消息队列后还可以避免生产者或者消费者任意一方出现问题时发生数据丢失的情况。所以在业务高峰期的时候,可以将一些数据放在消息队列中,等待系统慢慢消费处理,实现削峰。如果消息队列满了可以考虑对消息队列服务进行扩容。


二、Kafka中的名词

1、broker:kafka集群由多个节点构成,每个节点上的实例就叫做broker

2、producer:生产者,由它来生产数据和决定消息的属性,比如partition和replicas的数量。kafka配置文件中虽然也可以指定,但那个只是默认值
3、consumer:消费者,由它来处理生产出的数据,将消费进度offset保存在Kafka的topic中
4、topic:主题,起到数据分类的作用

5、partition:分区,每个topic可以有多个分区。合理的分区可以提高Kafka集群的负载能力

5、replicas副本,每个partition可以配置多个replicas作为备份。在这些副本中会有一个leader角色负责数据的读写操作,而其他副本则属于follower,仅和leader保持同步而不提供其他服务。当leader故障时,Kafka根据ISR(将所有follower加入到ISR中,如果有follower迟迟不能和leader保持同步则将其踢出)选取一个follower提升为leader。这些副本之间存在一个同样的高水位(High Watermark),高水位的作用就是让消费者只能看到水位线之前的数据,这样可以确保角色切换后对于消费者来说数据是一致的,但是不能保证数据的完整或者不重复。数据的丢失或者重复问题由ACK机制负责,ACK级别分了0、1、-1三种,0、1均有丢失数据的问题,-1代表leader和follower全部落盘成功后才返回ack,数据不会丢失,但可能产生重复数据

6、zookeeper:注册中心服务。由于kafka支持集群化,leader和follower可能不在一个服务器中,这个时候就需要将leader的信息注册在Zookeeper中。对于Kafka而言,它将每个节点信息的运行情况告诉zk,再由zk来配置和存储节点与主题队列信息。zk和kafka的topic有一样的leader\follower角色


三、搭建Zookeeper+Kafka集群

1、由于Kafka是一个分布式系统,它依赖ZooKeeper来完成协调任务、状态与配置管理,所以在运行Kafka之前要先安装并启动ZK,具体可参考本站文章——《ZooKeeper集群配置教程》


2、下载并安装Kafka

#在kafka.apache.org/downloads下载二进制安装包,解压后即可使用
tar zxf kafka_2.11-2.1.1.tgz
mv kafka_2.11-2.1.1 /usr/local/


3、修改Kafka配置文件

#通常只需要修改server.properties这个文件,虽然还有生产者、消费者的属性文件,但是一般都是由程序去控制
vi config/server.properties
broker.id=1  #集群中每个节点ID不能相同
log.dirs=/data/kafka/log  #Kafka消息存放路径,并不是日志,kafka的日志会自动生成到其它目录中
zookeeper.connect=192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181  #zk服务地址与端口
listeners=PLAINTEST://192.168.1.100:9092  #kafka监听端口
#优化配置
log.flush.interval.message=10000  #日志从内存刷到磁盘的阈值,达到这个数量就落盘,通常要减少落盘的频率
log.retention.hours=168  #日志落盘后保留多少小时,超时则删除,即便没有消费,避免磁盘写满
log.segment.bytes=1073741824  #单个日志的大小,超出则创建新的日志,删掉旧的日志
log.retention.check.interval.ms=300000  #检测日志是否达到删除条件的周期


4、使用kafka-server-start.sh脚本后台启动kafka

nohup bin/kafka-server-start.sh config/server.properties &
# bin/kafka-server-start.sh -daemon config/server.properties #另一种后台启动方式


5、登录ZooKeeper后可以看到kfaka的信息已经注册进来

./zkCli.sh 
ls /


kafka1.png


四、创建topic,完成数据的生产与消费

1、使用kafka-topics.sh脚本完成topic的创建、查看与删除

#创建topic
./kafka-topics.sh --create --zookeeper 192.168.1.101:2181 --replication-factor 1 --partitions 1 --topic test  
#--zookeeper:如果ZK是集群的话,随便写一个地址都可以
#--replication-factor:topic的副本数,通常3节点的话2个副本就够了,而1其实代表没有副本
#--partitions:topic分区的数量,通常和节点数保持一致
#--topic:队列名字

#查看topic
kafka-topics.sh --list --zookeeper 172.20.0.155:2181

#查看topic详细信息,包含分区、副本与Leader情况
kafka-topics.sh --describe --topic test --zookeeper 192.168.1.100:2181 

#删除topic
kafka-topics.sh --zookeeper 172.20.0.155:2181,172.20.0.156:2181,172.20.0.157:2181 --delete --topic test_topic


2、在2个终端下分别使用下面的脚本模拟数据生产与消费过程

kafka-console-producer.sh --broker-list localhost:9092 --topic hello  #在hello这个队列中开始生产数据
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning  #从队列头部开始消费数据


3、登录Zookeeper验证,任意一台ZK节点应该都可以看到topic的信息。不过这里看到的只是信息而已,数据其实还是在Kfaka中。先看看ZK中的数据是怎么样的:

./zkCli.sh
ls /brokers/topic/  #路径根据实际情况做调整


kafka2.png


4、在Kfaka配置文件中log.dirs所指定的目录中,存放了topic数据目录,在创建topic时配置了几个分区就会生成几份目录(如果创建topic时--partitions设定的是1,而Kfaka又有5个集群的话,那数据只会在一个节点上看到,ZK这个时候就发挥出作用了,它能告知用户数据在哪台上)。每个目录中的.log文件保存有队列中的消息,生产者生产的消息都会追加到log文件的尾部。而.index文件则记录了数据偏移量,根据这个快速查询到log中的数据。


5、推荐使用开源工具kafka-eagle,通过图形化界面方便的查看和管理Kafka集群


五、Kafka的扩容配置,需要对已有topic增加partition和replicas

1、首先要明白扩容的话意味着需要增加Kafka节点,所以第一件事就是在新节点上部署好Kafka,配置要和之前节点一致

2、登录ZK节点然后查看ids信息,如果发现新节点代表Kafka扩容是成功的

kafka3.png

3、列出所有topic,找到需要扩容的topic,假设这里的topic是test

./kafka-topics.sh --list --zookeeper 192.168.1.101:2181
./kafka-topics.sh --describe --zookeeper 192.168.1.101:2181/kafka --topic test  #可以看到test的partition和replicase等信息


4、 对partition进行扩容,扩容后重新查看信息可以看到变化,但是这个时候replicas依然只有一份

./kafka-topics.sh --zookeeper 192.168.1.101:2181/kafka --alter --topic test --partitions 3  #扩充到3个partitions


5、replicas的扩容需要先生成json文件,然后再用命令去读取该文件。需要注意如果该文件以前给其他topic使用过,需要重新生成一份而不能直接在上面修改,否则会失败

vi test.json
{
    "partitions": [
        {
            "topic": "test",  #需要扩容的topic
            "partition": 0,  #需要增加replicas的partitons
            "replicas": [  #replicas需要增加在哪个broker.id上
                34,
                35
            ]
        },
        {
            "topic": "test",
            "partition": 1,
            "replicas": [
                34,
                35,
                36
            ]
        }
    ]
}


6、通过json文件扩容Replicas

./kafka-reassign-partitions.sh --zookeeper 192.168.1.101:2181/kafka --reassignment-json-file test.json  --execute 


7、最后查看topic信息

./kafka-topics.sh --describe --zookeeper 192.168.1.101:2181/kafka --topic test


温馨提示如有转载或引用以上内容之必要,敬请将本文链接作为出处标注,谢谢合作!
et_highlighter51
版权所有:《Linux运维技术学习站点
文章标题:《大数据组件之Kafka基本概念与集群配置详细教程
除非注明,文章均为 《Linux运维技术学习站点》 原创
转载请注明本文短网址:http://www.linuxe.cn/post-510.html  [生成短网址]

已有 2/3005 人参与

评论:

爱乐 2020-06-16 19:01
博主写的不错
TangLu 2020-06-16 20:27
@爱乐:谢谢支持 一起学习

发表评论:

欢迎分享Linux运维技术学习站点

欢迎使用手机扫描访问本站,还可以关注微信哦~