CentOS7安装Kafka Zookeeper集群
Kafka提供了一个用于存储、读取和分析流数据的框架
系统配置
yum update -y yum install vim wget net-tools -y setenforce 0 && sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config systemctl stop firewalld && systemctl disable firewalld && systemctl status firewalld hostnamectl --static set-hostname Kafka3 && su vim /etc/hosts 添加 192.168.0.14 Kafka1 192.168.0.15 Kafka2 192.168.0.16 Kafka3
安装JDK
yum install java-1.8.0 -y java -version
安装zookeeper
cd /usr/local/ wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz mv apache-zookeeper-3.6.3-bin zookeeper rm -rf apache-zookeeper-3.6.3-bin.tar.gz vim /etc/profile 添加 export ZOOKEEPER_HOME=/usr/local/zookeeper/ export PATH=$ZOOKEEPER_HOME/bin:$PATH 执行source /etc/profile
更改zookeeper配置文件
mkdir -p /usr/local/zookeeper/data mkdir -p /usr/local/zookeeper/log vim /usr/local/zookeeper/conf/zoo.cfg #more /usr/local/zookeeper/conf/zoo_sample.cfg|grep -Ev "^#|^$" 改为 tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data dataLogDir=/usr/local/zookeeper/log admin.serverPort=8090 clientPort=2181 server.1=192.168.0.14:2888:3888 server.2=192.168.0.15:2888:3888 server.3=192.168.0.16:2888:3888
设置zookeeper集群id(每个节点id不能相同)
#节点1 echo "1" > /usr/local/zookeeper/data/myid #节点2 echo "2" > /usr/local/zookeeper/data/myid 节点2 echo "3" > /usr/local/zookeeper/data/myid cat /usr/local/zookeeper/data/myid
配置开机自启
vim /usr/lib/systemd/system/zookeeper.service 添加 [Unit] Description=Zookeeper.service After=network.target [Service] Type=forking ExecStart=/usr/local/zookeeper/bin/zkServer.sh start ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart [Install] WantedBy=multi-user.target 执行systemctl daemon-reload && systemctl enable zookeeper.service && systemctl start zookeeper && systemctl status zookeeper
测试zookeeper集群
cd /usr/local/zookeeper/ bin/zkServer.sh status #可以看到当前节点是follower还是leader
安装Kafka
cd /usr/local/ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz scp -r kafka_2.13-2.8.0.tgz root@192.168.0.15:/usr/local/ scp -r kafka_2.13-2.8.0.tgz root@192.168.0.16:/usr/local/ tar -zxvf kafka_2.13-2.8.0.tgz mv kafka_2.13-2.8.0 kafka rm -rf kafka_2.13-2.8.0.tgz mkdir /usr/local/kafka/kafkalogs vim /etc/profile 添加 export KAFKA_HOME=/usr/local/kafka/ export PATH=$PATH:$KAFKA_HOME/bin 执行source /etc/profile
更改Kafka配置文件
echo "" > /usr/local/kafka/config/server.properties vim /usr/local/kafka/config/server.properties #cat /usr/local/kafka/config/server.properties |grep -Ev "^#|^$" #节点1改为 broker.id=1 listeners=PLAINTEXT://Kafka1:9092 host.name=Kafka1 advertised.listeners=PLAINTEXT://Kafka1:9092 advertised.host.name=Kafka1 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/kafkalogs num.partitions=3 delete.topic.enable=true replica.fetch.max.bytes=5242880 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 message.max.byte=5242880 log.cleaner.enable=true log.retention.hours=72 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #节点2改为 broker.id=2 listeners=PLAINTEXT://Kafka2:9092 host.name=Kafka2 advertised.listeners=PLAINTEXT://Kafka2:9092 advertised.host.name=Kafka2 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/kafkalogs num.partitions=3 delete.topic.enable=true replica.fetch.max.bytes=5242880 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 message.max.byte=5242880 log.cleaner.enable=true log.retention.hours=72 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #节点3改为 broker.id=3 listeners=PLAINTEXT://Kafka3:9092 host.name=Kafka3 advertised.listeners=PLAINTEXT://Kafka3:9092 advertised.host.name=Kafka3 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/kafkalogs num.partitions=3 delete.topic.enable=true replica.fetch.max.bytes=5242880 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 message.max.byte=5242880 log.cleaner.enable=true log.retention.hours=72 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
配置开机自启
vim /usr/lib/systemd/system/kafka.service 添加 [Unit] Description=kafka.service After=network.target zookeeper.service [Service] Type=forking Restart=always ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties ExecReload=/usr/local/kafka/bin/kafka-server-stop.sh && sleep 2 && /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh [Install] WantedBy=multi-user.target 执行systemctl daemon-reload && systemctl enable kafka.service && systemctl start kafka && systemctl status kafka
测试
创建一个名称为my-topic的3副本3分区的topic
cd /usr/local/kafka/bin ./kafka-topics.sh --create --zookeeper 192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181 --replication-factor 3 --partitions 3 --topic my-topic
展示topic
./kafka-topics.sh --list --zookeeper 192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181
查看topic详细信息
./kafka-topics.sh --describe --zookeeper 192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181 --topic my-topic
节点1模拟生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.0.14:9092 --topic my-topic 输入hello world
节点2、节点3模拟消费者消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.0.14:9092,192.168.0.15:9092,192.168.0.16:9092 --topic my-topic --from-beginning
节点1删除名称为my-topic的topic
./kafka-topics.sh --delete --topic my-topic --zookeeper 192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181
删除后节点2、节点3提示
查看SyslogTopic是否还存在
安装kafka-eagle
下载源码
#下载地址http://download.kafka-eagle.org/ cd /usr/local wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.6.tar.gz tar -zxvf kafka-eagle-bin-2.0.6.tar.gz rm -rf kafka-eagle-bin-2.0.6.tar.gz cd /usr/local/kafka-eagle-bin-2.0.6 tar xf kafka-eagle-web-2.0.6-bin.tar.gz mv kafka-eagle-web-2.0.6 /usr/local/kafka-eagle rm -rf /usr/local/kafka-eagle-bin-2.0.6.tar.gz /usr/local/kafka-eagle-bin-2.0.6
配置openjdk环境变量
yum install java-1.8.0-openjdk-devel -y vim /etc/profile 添加 export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk export KE_HOME=/usr/local/kafka-eagle export PATH=$PATH:$KE_HOME/bin 执行source /etc/profile
修改配置文件
vim /usr/local/kafka-eagle/conf/system-config.properties 改为 ###################################### # multi zookeeper & kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=192.168.0.14:2181,192.168.0.15:2181,192.168.0.16:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.kafka.eagle.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.kafka.eagle.jmx.acl=false cluster1.kafka.eagle.jmx.user=keadmin cluster1.kafka.eagle.jmx.password=keadmin123 cluster1.kafka.eagle.jmx.ssl=false cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore cluster1.kafka.eagle.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # kafka jmx uri ###################################### cluster1.kafka.eagle.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### kafka.eagle.metrics.charts=true kafka.eagle.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### kafka.eagle.sql.topic.records.max=5000 kafka.eagle.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### kafka.eagle.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.kafka.eagle.sasl.enable=false cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.kafka.eagle.sasl.client.id= cluster1.kafka.eagle.blacklist.topics= cluster1.kafka.eagle.sasl.cgroup.enable=false cluster1.kafka.eagle.sasl.cgroup.topics= cluster2.kafka.eagle.sasl.enable=false cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster2.kafka.eagle.sasl.mechanism=PLAIN cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; cluster2.kafka.eagle.sasl.client.id= cluster2.kafka.eagle.blacklist.topics= cluster2.kafka.eagle.sasl.cgroup.enable=false cluster2.kafka.eagle.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.kafka.eagle.ssl.enable=false cluster3.kafka.eagle.ssl.protocol=SSL cluster3.kafka.eagle.ssl.truststore.location= cluster3.kafka.eagle.ssl.truststore.password= cluster3.kafka.eagle.ssl.keystore.location= cluster3.kafka.eagle.ssl.keystore.password= cluster3.kafka.eagle.ssl.key.password= cluster3.kafka.eagle.ssl.endpoint.identification.algorithm=https cluster3.kafka.eagle.blacklist.topics= cluster3.kafka.eagle.ssl.cgroup.enable=false cluster3.kafka.eagle.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle/db/ke.db kafka.eagle.username=root kafka.eagle.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address ###################################### #kafka.eagle.driver=com.mysql.cj.jdbc.Driver #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #kafka.eagle.username=root #kafka.eagle.password=123456
启动程序
cd ${KE_HOME}/bin chmod +x ke.sh ./ke.sh start #停止服务 $KE_HOME/bin/ke.sh stop #重启服务 $KE_HOME/bin/ke.sh restart #查看服务运行状态 $KE_HOME/bin/ke.sh stats #查看服务状态 $KE_HOME/bin/ke.sh stats #动态查看服务输出日志 tail -f $KE_HOME/logs/ke_console.out
登陆http://192.168.0.14:8048/cluster/info
解决提示出错java.io.IOException cannot be cast to javax.mangment.remote.JMXConnector
这种问题是没有开启JMX
kafkaeagle中也无法获取对应信息
给kafka添加JMX
vim /usr/local/kafka/bin/kafka-server-start.sh 倒数第二行添加 export JMX_PORT="9988" 执行systemctl restart kafka && systemctl status kafka
端口已启动
kafka信息已正常
给zookeeper添加JMX
vim /usr/local/zookeeper/conf/java.env 添加 JMXHOSTNAME="192.168.0.14" JMXPORT=8899 vim /usr/local/zookeeper/bin/zkServer.sh 找到第71行ZOOMAIN="-Dcom.sun.management.jmxremote 添加-Djava.rmi.server.hostname=$JMXHOSTNAME 第77行添加ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
端口已启动
vim /usr/local/zookeeper/conf/zoo.cfg 添加 4lw.commands.whitelist=* systemctl restart zookeeper && systemctl status zookeeper yum install nc -y echo stat|nc 127.0.0.1 2181
zookeeper信息已正常
本站所有文章均可随意转载,转载时请保留原文链接及作者。