本文目录:
- 1、深入理解kafka(五)日志存储
- 2、Kafka简介+Kafka Tool使用简介+使用实例
- 3、Kafka的Topic配置详解
深入理解kafka(五)日志存储
5.1文件目录布局
根目录下有以下5个checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分区目录下有以下目录: 0000xxx.index(偏移量为64位长整形,长度固定为20位), 0000xxx.log, 0000xxx.timeindex.
还有可能包含.deleted .cleaned .swap等临时文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日志格式演变
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校验值
magic(1B):消息版本号0
attributes(1B):消息属性。低3位表示压缩类型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的长度。-1代表null
key: 可选
value length(4B):实际消息体的长度。-1代表null
value: 消息体。可以为空,如墓碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)字段,表示消息的时间戳
attributes的第4位也被利用起来,0表示timestamp的类型为CreateTime,1表示timestamp的类型为LogAppendTime
timestamp类型由broker端参数log.message.timestamp.type来配置,默认为CreateTime,即采用生产者创建的时间戳
5.2.3 消息压缩
保证端到端的压缩,服务端配置compression.type,默认为”producer”,表示保留生产者使用的压缩方式,还可以配置为”gzip”,”snappy”,”lz4″
多条消息压缩至value字段,以提高压缩率
5.2.4 变长字段
变长整形(Varints):每一个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节为1,其余都为0,字节倒序排列
为了使编码更加高效,Varints使用ZigZag编码:sint32对应 (n1)^(n31) sint64对应 (n1)^(n63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定为2
attributes:两个字节。低3位表示压缩格式,第4位表示时间戳类型,第5位表示事务(0-非事务1-事务),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
producer id:
producer epoch:
first sequence:
records count:
v2版本的消息去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且弃用了attributes
Record
length:
attributes:弃用,但仍占据1B
timestamp delta:
offset delta:
headers:
5.3日志索引
稀疏索引(sparse index):每当写入一定量(broker端参数log.index.interval.bytes指定,默认为4096B),偏移量索引文件和时间索引文件分别对应一个索引项
日志段切分策略:
1.大小超过broker端参数log.segment.bytes配置的值,默认为1073741824(1GB)
2.当前日志段消息的最大时间戳与当前系统的时间戳差值大于log.roll.ms或者log.roll.hours,ms优先级高,默认log.roll.hours=168(7天)
3.索引文件或者时间戳索引文件的大小大于log.index.size.max.bytes配置的值,默认为10485760(10MB)
4.偏移量差值(offset-baseOffset)Integer.MAX_VALUE
5.3.1 偏移量索引
每个索引项占用8个字节,分为两个部分:1.relativeOffset相对偏移量(4B) 2.position物理地址(4B)
使用kafka-dump-log.sh脚本来解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端参数log.index.size.max.bytes不是8的倍数,内部会自动转换为8的倍数
5.3.2 时间戳索引
每个索引项占用12个字节,分为两个部分:1.timestamp当前日志分段的最大时间戳(12B) 2.relativeOffset时间戳对应的相对偏移量(4B)
如果broker端参数log.index.size.max.bytes不是12的倍数,内部会自动转换为12的倍数
5.4日志清理
日志清理策略可以控制到主题级别
5.4.1 日志删除
broker端参数log.cleanup.policy设置为delete(默认为delete)
检测周期broker端参数log.retention.check.interval.ms=300000(默认5分钟)
1.基于时间
broker端参数log.retention.hours,log.retention.minutes,log.retention.ms,优先级msminuteshours
删除时先增加.delete后缀,延迟删除根据file.delete.delay.ms(默认60000)配置
2.基于日志大小
日志总大小为broker端参数log.retention.bytes(默认为-1,表示无穷大)
日志段大小为broker端参数log.segment.bytes(默认为1073741824,1GB)
3.基于日志起始偏移量
DeleteRecordRequest请求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh脚本
5.4.2 日志压缩
broker端参数log.cleanup.policy设置为compact,且log.cleaner.enable设置为true(默认为true)
5.5磁盘存储
相关测试:一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性写入600MB/s,随机写入100KB/s,随机内存写入400MB/s,线性内存3.6GB/s
5.5.1 页缓存
Linux操作系统的vm.dirty_background_ratio参数用来指定脏页数量达到系统的百分比之后就触发pdflush/flush/kdmflush,一般小于10,不建议为0
vm.dirty_ratio表示脏页百分比之后刷盘,但是阻塞新IO请求
kafka同样提供同步刷盘及间断性强制刷盘(fsync)功能,可以通过log.flush.interval.messages、log.flush.interval.ms等参数来控制
kafka不建议使用swap分区,vm.swappiness参数上限为100,下限为0,建议设置为1
5.5.2 磁盘I/O流程
一般磁盘IO的场景有以下4种:
1.用户调用标准C库进行IO操作,数据流为:应用程序Buffer-C库标准IOBuffer-文件系统也缓存-通过具体文件系统到磁盘
2.用户调用文件IO,数据流为:应用程序Buffer-文件系统也缓存-通过具体文件系统到磁盘
3.用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘
4.用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接读写磁盘
Linux系统中IO调度策略有4种:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷贝
指数据直接从磁盘文件复制到网卡设备中,不需要经应用程序
对linux而言依赖于底层的sendfile()
对java而言,FileChannal.transferTo()的底层实现就是sendfile()
Kafka简介+Kafka Tool使用简介+使用实例
详细安装访问:
macOS 可以用homebrew快速安装,访问地址:
原文链接:
查看topic列表:
创建topic:
–create :创建命令;
–topic :后面指定topic名称;
–replication-factor :后面指定副本数;
–partitions :指定分区数,根据broker的数量决定;
–zookeeper :后面指定 zookeeper.connect 的zk链接
查看某个topic:
Kafka 作为消息系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。
1、引入依赖
2、消息生产者
示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。
bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。
key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是发送文本消息到服务器 , 所以使用的是StringSerializer。
key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。
zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。
有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :
ProducerRecord(topic,partition,key,value);
ProducerRecord(topic,key,value);
ProducerRecord(topic, value) ;
其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。
3、消息消费者
和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。
bootstrap. servers 和生产者一样,表示 Kafka 集群。
group.id 表示消费者的分组 ID。
enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。
auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。
key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。
消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。
Kafka的Topic配置详解
配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。
创建topic参数可以设置一个或多个–config “Property(属性)”,下面是创建一个topic名称为”my-topic”例子,它设置了2个参数max message size 和 flush rate.
(A)创建topic时配置参数
(B)修改topic时配置参数
覆盖已经有topic参数,下面例子修改”my-topic”的max message属性
(C)删除topic级别配置参数
注:配置的kafka集群的根目录为/config/mobile/mq/mafka02,因此所有节点信息都在此目录下。
cleanup.policy
delete.retention.ms
delete.retention.ms
flush.messages
flush.ms
index.interval.bytes
message.max.bytes
min.cleanable.dirty.ratio
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
log.roll.hours
参考资料:
本文来源:https://www.yuntue.com/post/81060.html | 云服务器网,转载请注明出处!

微信扫一扫打赏
支付宝扫一扫打赏