云服务器网:购买云服务器和VPS必上的网站!

腾讯云ckafka(腾讯云ckafka api)

本文目录:1、深入理解kafka(五)日志存储2、Kafka简介+Kafka Tool使用简介+使用实例3、Kafka的Topic配置详解深入理解kafka(五)日志存储5.1文件目录布局 根目录下有以下5个checkpoint文件: cleaner-offset-checkpoint, log

本文目录:

  • 1、深入理解kafka(五)日志存储
  • 2、Kafka简介+Kafka Tool使用简介+使用实例
  • 3、Kafka的Topic配置详解

深入理解kafka(五)日志存储

5.1文件目录布局

根目录下有以下5个checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint

分区目录下有以下目录: 0000xxx.index(偏移量为64位长整形,长度固定为20位), 0000xxx.log, 0000xxx.timeindex.

还有可能包含.deleted .cleaned .swap等临时文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint

5.2日志格式演变

5.2.1 v0版本

kafka0.10.0之前

RECORD_OVERHEAD包括offset(8B)和message size(4B)

RECORD包括:

crc32(4B):crc32校验值

magic(1B):消息版本号0

attributes(1B):消息属性。低3位表示压缩类型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)

key length(4B):表示消息的key的长度。-1代表null

key: 可选

value length(4B):实际消息体的长度。-1代表null

value: 消息体。可以为空,如墓碑消息

5.2.2 v1版本

kafka0.10.0-0.11.0

比v0多了timestamp(8B)字段,表示消息的时间戳

attributes的第4位也被利用起来,0表示timestamp的类型为CreateTime,1表示timestamp的类型为LogAppendTime

timestamp类型由broker端参数log.message.timestamp.type来配置,默认为CreateTime,即采用生产者创建的时间戳

5.2.3 消息压缩

保证端到端的压缩,服务端配置compression.type,默认为”producer”,表示保留生产者使用的压缩方式,还可以配置为”gzip”,”snappy”,”lz4″

多条消息压缩至value字段,以提高压缩率

5.2.4 变长字段

变长整形(Varints):每一个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节为1,其余都为0,字节倒序排列

为了使编码更加高效,Varints使用ZigZag编码:sint32对应 (n1)^(n31) sint64对应 (n1)^(n63)

5.2.5 v2版本

Record Batch

first offset:

length:

partition leader epoch:

magic:固定为2

attributes:两个字节。低3位表示压缩格式,第4位表示时间戳类型,第5位表示事务(0-非事务1-事务),第6位控制消息(0-非控制1控制)

first timestamp:

max timestamp:

producer id:

producer epoch:

first sequence:

records count:

v2版本的消息去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且弃用了attributes

Record

length:

attributes:弃用,但仍占据1B

timestamp delta:

offset delta:

headers:

5.3日志索引

稀疏索引(sparse index):每当写入一定量(broker端参数log.index.interval.bytes指定,默认为4096B),偏移量索引文件和时间索引文件分别对应一个索引项

日志段切分策略:

1.大小超过broker端参数log.segment.bytes配置的值,默认为1073741824(1GB)

2.当前日志段消息的最大时间戳与当前系统的时间戳差值大于log.roll.ms或者log.roll.hours,ms优先级高,默认log.roll.hours=168(7天)

3.索引文件或者时间戳索引文件的大小大于log.index.size.max.bytes配置的值,默认为10485760(10MB)

4.偏移量差值(offset-baseOffset)Integer.MAX_VALUE

5.3.1 偏移量索引

每个索引项占用8个字节,分为两个部分:1.relativeOffset相对偏移量(4B) 2.position物理地址(4B)

使用kafka-dump-log.sh脚本来解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:

bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index

如果broker端参数log.index.size.max.bytes不是8的倍数,内部会自动转换为8的倍数

5.3.2 时间戳索引

每个索引项占用12个字节,分为两个部分:1.timestamp当前日志分段的最大时间戳(12B) 2.relativeOffset时间戳对应的相对偏移量(4B)

如果broker端参数log.index.size.max.bytes不是12的倍数,内部会自动转换为12的倍数

5.4日志清理

日志清理策略可以控制到主题级别

5.4.1 日志删除

broker端参数log.cleanup.policy设置为delete(默认为delete)

检测周期broker端参数log.retention.check.interval.ms=300000(默认5分钟)

1.基于时间

broker端参数log.retention.hours,log.retention.minutes,log.retention.ms,优先级msminuteshours

删除时先增加.delete后缀,延迟删除根据file.delete.delay.ms(默认60000)配置

2.基于日志大小

日志总大小为broker端参数log.retention.bytes(默认为-1,表示无穷大)

日志段大小为broker端参数log.segment.bytes(默认为1073741824,1GB)

3.基于日志起始偏移量

DeleteRecordRequest请求

1.KafkaAdminClient的deleteRecord()

2.kafka-delete-record.sh脚本

5.4.2 日志压缩

broker端参数log.cleanup.policy设置为compact,且log.cleaner.enable设置为true(默认为true)

5.5磁盘存储

相关测试:一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性写入600MB/s,随机写入100KB/s,随机内存写入400MB/s,线性内存3.6GB/s

5.5.1 页缓存

Linux操作系统的vm.dirty_background_ratio参数用来指定脏页数量达到系统的百分比之后就触发pdflush/flush/kdmflush,一般小于10,不建议为0

vm.dirty_ratio表示脏页百分比之后刷盘,但是阻塞新IO请求

kafka同样提供同步刷盘及间断性强制刷盘(fsync)功能,可以通过log.flush.interval.messages、log.flush.interval.ms等参数来控制

kafka不建议使用swap分区,vm.swappiness参数上限为100,下限为0,建议设置为1

5.5.2 磁盘I/O流程

一般磁盘IO的场景有以下4种:

1.用户调用标准C库进行IO操作,数据流为:应用程序Buffer-C库标准IOBuffer-文件系统也缓存-通过具体文件系统到磁盘

2.用户调用文件IO,数据流为:应用程序Buffer-文件系统也缓存-通过具体文件系统到磁盘

3.用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘

4.用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接读写磁盘

Linux系统中IO调度策略有4种:

1.NOOP:no operation

2.CFQ

3.DEADLINE

4.ANTICIPATORY

5.5.3 零拷贝

指数据直接从磁盘文件复制到网卡设备中,不需要经应用程序

对linux而言依赖于底层的sendfile()

对java而言,FileChannal.transferTo()的底层实现就是sendfile()

Kafka简介+Kafka Tool使用简介+使用实例

详细安装访问:

macOS 可以用homebrew快速安装,访问地址:

原文链接:

查看topic列表:

创建topic:

–create :创建命令;

–topic :后面指定topic名称;

–replication-factor :后面指定副本数;

–partitions :指定分区数,根据broker的数量决定;

–zookeeper :后面指定 zookeeper.connect 的zk链接

查看某个topic:

Kafka 作为消息系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。

1、引入依赖

2、消息生产者

示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。

bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。

key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是发送文本消息到服务器 , 所以使用的是StringSerializer。

key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。

zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。

有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :

ProducerRecord(topic,partition,key,value);

ProducerRecord(topic,key,value);

ProducerRecord(topic, value) ;

其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。

3、消息消费者

和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。

bootstrap. servers 和生产者一样,表示 Kafka 集群。

group.id 表示消费者的分组 ID。

enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。

auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。

key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。

消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。

Kafka的Topic配置详解

配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。

创建topic参数可以设置一个或多个–config “Property(属性)”,下面是创建一个topic名称为”my-topic”例子,它设置了2个参数max message size 和 flush rate.

(A)创建topic时配置参数

(B)修改topic时配置参数

覆盖已经有topic参数,下面例子修改”my-topic”的max message属性

(C)删除topic级别配置参数

注:配置的kafka集群的根目录为/config/mobile/mq/mafka02,因此所有节点信息都在此目录下。

cleanup.policy

delete.retention.ms

delete.retention.ms

flush.messages

flush.ms

index.interval.bytes

message.max.bytes

min.cleanable.dirty.ratio

retention.bytes

retention.ms

segment.bytes

segment.index.bytes

log.roll.hours

参考资料:

本文来源:https://www.yuntue.com/post/81060.html | 云服务器网,转载请注明出处!

关于作者: yuntue

云服务器(www.yuntue.com)是一家专门做阿里云服务器代金券、腾讯云服务器优惠券的网站,这里你可以找到阿里云服务器腾讯云服务器等国内主流云服务器优惠价格,以及海外云服务器、vps主机等优惠信息,我们会为你提供性价比最高的云服务器和域名、数据库、CDN、免费邮箱等企业常用互联网资源。

为您推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注