Kafka集群部署指南

9/29/2018 kafka

# Kafka集群部署指南

# 一、前言

# 1、Kafka简介

Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch (opens new window)Hadoop (opens new window)等)

Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

# 2、Kafka架构

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念/对象 简单说明
Broker Kafka节点
Topic 主题,用来承载消息
Partition 分区,用于主题分片存储
Producer 生产者,向主题发布消息的应用
Consumer 消费者,从主题订阅消息的应用
Consumer Group 消费者组,由多个消费者组成

# 3、准备工作

# 1、Kafka服务器

准备3台CentOS服务器,并配置好静态IP、主机名

服务器名 IP 说明
kafka01(elk-1) 172.16.249.101 Kafka节点1
kafka02(elk-2) 172.16.249.102 Kafka节点2
kafka03(elk-3) 172.16.249.103 Kafka节点3

软件版本说明

说明
Linux Server CentOS 7
Kafka 2.3.0

# 2、ZooKeeper集群

Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK

# 二、部署过程

# 1、应用&数据目录

#创建应用目录
mkdir /usr/kafka

#创建Kafka数据目录
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka
1
2
3
4
5
6
7

# 2、下载&解压

Kafka官方下载地址:https://kafka.apache.org/down... (opens new window) 这次我下载的是2.3.0版本

#进入 opt下
cd /opt/

#下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz 

#创建应用目录,并解压到应用目录
mkdir /usr/kafka 
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
# 创建日志目录
mkdir -pv /kafka/logs
1
2
3
4
5
6
7
8
9
10
11

kafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本,2.3.0才是Kafka的版本

# 3、Kafka节点配置

#进入应用目录
cd /usr/kafka/kafka_2.12-2.3.0/

#修改配置文件
vim config/server.properties
1
2
3
4
5

# 通用配置

配置日志目录、指定ZooKeeper服务器

# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs

# root directory for all kafka znodes.
zookeeper.connect=elk-1:2181,elk-2:2181,elk-3:2181
1
2
3
4
5

# 分节点配置

  • Kafka01
broker.id=0

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://elk-1:9092
1
2
3
4
  • Kafka02
broker.id=1

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://elk-2:9092
1
2
3
4
  • Kafka03
broker.id=2

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://elk-3:9092
1
2
3
4

# 5、启动Kafka

#进入kafka根目录
cd /usr/kafka/kafka_2.12-2.3.0/
#启动
nohup ./bin/kafka-server-start.sh config/server.properties &

#启动成功输出示例(最后几行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
1
2
3
4
5
6
7
8
9

# 三、Kafka测试

# 1、创建Topic

在kafka01(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区

bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
1

Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka02、kafka03

# 2、查看Topic

我们可以通过命令列出指定Broker的

bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092
1

# 3、发送消息

这里我们向Broker(id=0)的Topic=test-ken-io发送消息

bin/kafka-console-producer.sh --broker-list  192.168.88.51:9092  --topic test-ken-io

#消息内容
> test by ken.io
1
2
3
4

# 4、消费消息

在Kafka02上消费Broker03的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
1

在Kafka03上消费Broker02的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
1

然后均能收到消息

test by ken.io
1

这是因为这两个消费消息的命令是建立了两个不同的Consumer 如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
1
2
3

# 四、备注

# 1、Kafka常用配置项说明

Kafka常用Broker配置说明:

配置项 默认值/示例值 说明
broker.id 0 Broker唯一标识
listeners PLAINTEXT://192.168.88.53:9092 监听信息,PLAINTEXT表示明文传输
log.dirs kafka/logs kafka数据存放地址,可以填写多个。用","间隔
message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
num.partitions 1 默认分区数
log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
log.retention.hours 24 控制一个log保留时间,单位:小时
zookeeper.connect 192.168.88.21:2181 ZooKeeper服务器地址,多台用","间隔