1.CEPH通信连接模型:

ceph消息模型

ceph消息模型

首先通信双方建立socket连接,然后server端会向client发送banner和地址信息

Banner的定义如下:

"ceph %x %xn",protocol_features_suppored,protocol_features_required

在系统中声明为常量,用于交换协议信息。地址信息通过linux的msghdr结构体传递。

Client接收到banner和地址信息验证通过后,也会向server端发送自己的banner和地址信息。
然后client向server端发送connect信息,server端收到后会验证,成功则回复reply connect信息。涉及到的两个结构体如下:

struct ceph_msg_connect {
    __le64 features;     /* supported feature bits */
    __le32 host_type;    /* CEPH_ENTITY_TYPE_* */
    __le32 global_seq;   /* count connections initiated by this host */
    __le32 connect_seq;  /* count connections initiated in this session */
    __le32 protocol_version;
    __le32 authorizer_protocol;
    __le32 authorizer_len;
    __u8  flags;         /* CEPH_MSG_CONNECT_* */
} __attribute__ ((packed));
struct ceph_msg_connect_reply {
    __u8 tag;
    __le64 features;     /* feature bits for this session */
    __le32 global_seq;
    __le32 connect_seq;
    __le32 protocol_version;
    __le32 authorizer_len;
    __u8 flags;
} __attribute__ ((packed));

对于新连接而言,global_seq一般为0。
然后通信双方建立pipe,会话就建立完成了。

2.消息发送接收流程

发送流程如下:
发送流程

发送流程

接收流程如下:
接收流程

接收流程

消息结构介绍:

Message 是所有消息的基类,任何要发送的消息,都要继承该类。对于消息,其发送格式如下:

HeaderUser dataFooter

User data包括:

PayloadMiddleData

payload, 一般保存ceph操作的元数据;middle 预留,目前没有使用到;data 一般为读写的数据。

整个Message的结构如下:

class Message{ 

ceph_msg_header header; *// 消息头* 
ceph_msg_footer footer; *// 消息尾* 
bufferlist payload; *// "front" unaligned blob* 
bufferlist middle; *// "middle" unaligned blob* 
bufferlist data; 
utime_t recv_stamp; *//开始接收数据的时间戳* 
utime_t dispatch_stamp; *// dispatch 的时间戳* 
utime_t throttle_stamp; */\* time at which message was fully read \*/* 
utime_t recv_complete_stamp; *//接收完成的时间戳* 
ConnectionRef connection; *//链接*
uint32_t magic; bi::list_member_hook<> dispatch_q; *//boost::intrusive list 的 member* } 

ceph_msg_header 主要是封装数据相关信息:

struct ceph_msg_header { 

__le64 seq; /* message seq*# for this session ## 当前session内 消息的唯一 序号\*/* __le64 tid; /* transaction id *## 消息的全局唯一的 id\*/* 
__le16 type; /* message type *## 消息类型 \*/* 
__le16 priority; /* priority. higher value == higher priority */ 
__le16 version; /* version of message encoding */

__le32 front_len; /* bytes in main payload *## payload 的长度\*/* 

__le32 middle_len; /* bytes in middle payload *## middle 的长度\*/* 

__le32 data_len; /* bytes of data payload *## data 的 长度 \*/* 

... ... ... } __attribute__ ((packed)); 

ceph_msg_footer 主要是封装结束标记和CRC校验码:

struct ceph_msg_footer { 

__le32 front_crc, middle_crc, data_crc;*//三个部分的 crc 效验码* 

__le64 sig; *// 消息的64位 signature* 

__u8 flags; *//结束标志* } __attribute__ ((packed)); 

3. 消息类型:

3.1 Monitor自身

消息类型消息结构体消息作用处理接口
CEPH_MSG_PINGMPing定期Ping Monitor确认Monitor的存在handle_ping
CEPH_MSG_MON_GET_MAPMMonGetMap认证前获取MonMaphandle_mon_get_map
CEPH_MSG_MON_METADATAMMonMetadata处理保存某个Monitor的系统信息(cpu,内存等)handle_mon_metadata
MSG_MON_COMMANDMMonCommand传递命令行消息给Monitor,Monitor再分发给相应的XXXMonitor进行处理handle_command
CEPH_MSG_MON_GET_VERSIONMMonGetVersion获取cluster map的版本信息handle_get_version
CEPH_MSG_MON_SUBSCRIBEMMonSubscribeCluster map订阅更新handle_subscribe
MSG_ROUTEMRoute路由请求转发(待确认)handle_route
MSG_MON_PROBEMMonProbe启动加入时需要向其他Monitor发送Probe请求handle_probe
MSG_MON_SYNCMMonSync同步Paxos状态数据handle_sync
MSG_MON_SCRUBMMonScrubMonitorDBStore数据一致性检测handle_scrub
MSG_MON_JOINMMonJoin如果不在MonMap中申请加入到MonMapMonmapMonitor::prepare_join
MSG_MON_PAXOSMMonPaxos选举完成后,leader会触发Paxos::leader_init,状态置为STATE_RECOVERING,并发起该消息的OP_COLLECT流程Paxos::dispatch
MSG_MON_ELECTIONMMonElection发起选举流程Elector::dispatch
MSG_FORWARDMForward将请求转发到leaderhandle_forward
MSG_TIMECHECKMTimeCheckMonitor每隔mon_timecheck_interval检测所有Monitor的系统时间来检测节点之间的时间差handle_timecheck
MSG_MON_HEALTHMMonHealth每隔mon_health_data_update_interval检测存放Monitor上面使用的leveldb数据的状态HealthMonitor::service_dispatch

3.2 AuthMonitor

消息类型消息结构体消息作用处理接口
MSG_MON_COMMANDMMonCommand处理ceph auth xxx命令行相关处理preprocess_command处理ceph auth get/export/list等 prepare_command处理ceph auth import/add/get-or-create/caps等
CEPH_MSG_AUTHMAuth实现认证和授权消息处理prep_auth

3.3 OSDMonitor

消息类型消息结构体消息作用处理接口
CEPH_MSG_MON_GET_OSDMAPMMonGetOSDMap获取OSDMappreprocess_get_osdmap
MSG_OSD_MARK_ME_DOWNMOSDMarkMeDownOSD shutdown之前通知Monitor发送该消息preprocess_mark_me_down prepare_mark_me_down
MSG_OSD_FAILUREMOSDFailure1. OSD每隔OSD_TICK_INTERVAL检测心跳无响应的OSD,并将失败的OSD report给Monitor 2. Monitor判断上报次数>=mon_osd_min_down_reports,那么就将target_osd标识为downpreprocess_failure
MSG_OSD_BOOTMOSDBoot新OSD加入时发送请求到Monitor,参考新OSD的加入流程preprocess_bootprepare_boot
MSG_OSD_ALIVEMOSDAliveOSD判断up_thru_wanted决定是否发送请求给Monitor,Monitor发送Incremental OSDMap返回给OSDpreprocess_alive prepare_alive
MSG_OSD_PGTEMPMOSDPGTempPrimary OSD处于backfilling状态无法提供读取服务时,会发送该消息到Monitor,将PG临时映射到其他的OSD上提供去服务preprocess_pgtemp prepare_pgtemp
MSG_REMOVE_SNAPSMRemoveSnaps删除快照信息prepare_remove_snaps
CEPH_MSG_POOLOPMPoolOp删除/创建Pool,创建/删除pool快照等prepare_pool_op

3.4 PGMonitor

消息类型消息结构体消息作用处理接口
CEPH_MSG_STATFSMStatfs返回文件系统osd占用的kb容量handle_statfs
MSG_PGSTATSMPGStats查询或者更新pg状态preprocess_pg_stats prepare_pg_stats
MSG_GETPOOLSTATSMGetPoolStats获取pool汇总状态信息preprocess_getpoolstats
MSG_MON_COMMANDMMonCommand处理ceph pg xxx相关命令行preprocess_command

3.5 MonMapMonitor

消息类型消息结构体消息作用处理接口
MSG_MON_JOINMMonJoin更新MonMappreprocess_join prepare_join
MSG_MON_COMMANDMMonCommand处理ceph mon xxx相关命令行preprocess_command prepare_command

3.6 MDSMonitor

消息类型消息结构体消息作用处理接口
MSG_MDS_BEACON
MSG_MDS_OFFLOAD_TARGETS

附:

Msgr.h文件:

1.Msgr.h文件:定义消息传输层的数据类型,以供ceph使用

(1)默认的监控端口:

#define CEPH_MON_PORT 6789

(2)客户端处理端口范围定义:

#define CEPH_PORT_FIRST 6789//监控

#define CEPH_PORT_START 6800 //开始

#define CEPH_PORT_LAST 6900//结束

(3)tcp协议标识和版本信息:

#define CEPH_BANNER "ceph v027"

#define CEPH_BANNER_MAX_LEN 30//最大长度

(4)ceph中的实体名称:在网络传输中使用,例如mds0表示元数据服务器0

struct ceph_entity_name {

_u8 type; /* CEPH_ENTITY_TYPE* /*

__le64 num;

} attribute ((packed));//按照紧凑模式分配内存,而不是内存对齐

#define CEPH_ENTITY_TYPE_MON 0x01//监控服务器实体

#define CEPH_ENTITY_TYPE_MDS 0x02//元数据服务器实体

#define CEPH_ENTITY_TYPE_OSD 0x04//对象存储设备实体

#define CEPH_ENTITY_TYPE_CLIENT 0x08//客户端实体

#define CEPH_ENTITY_TYPE_AUTH 0x20//权限实体

#define CEPH_ENTITY_TYPE_ANY 0xFF//任何

同时提供一个函数用于将类型转换为名字字符串的函数如下:

const char *ceph_entity_type_name(int type)

{

switch (type) {

case CEPH_ENTITY_TYPE_MDS: return "mds";

case CEPH_ENTITY_TYPE_OSD: return "osd";

case CEPH_ENTITY_TYPE_MON: return "mon";

case CEPH_ENTITY_TYPE_CLIENT: return "client";

case CEPH_ENTITY_TYPE_AUTH: return "auth";

default: return "unknown";

}

}

(5)实体网络地址以及与名字实体的对应关系结构体

struct ceph_entity_addr {

__le32 type;

__le32 nonce; /* unique id for process (e.g. pid) */

struct sockaddr_storage in_addr;

} attribute ((packed));

struct ceph_entity_inst {

struct ceph_entity_name name;

struct ceph_entity_addr addr;

} attribute ((packed));

(6)消息交换协议定义:

#define CEPH_MSGR_TAG_READY 1 /* server->client: ready for messages */

#define CEPH_MSGR_TAG_RESETSESSION 2 /* server->client: reset, try again */

#define CEPH_MSGR_TAG_WAIT 3 /* server->client: wait for racing

incoming connection */

#define CEPH_MSGR_TAG_RETRY_SESSION 4 /* server->client + cseq: try again

with higher cseq */

#define CEPH_MSGR_TAG_RETRY_GLOBAL 5 /* server->client + gseq: try again

with higher gseq */

#define CEPH_MSGR_TAG_CLOSE 6 /* closing pipe */

#define CEPH_MSGR_TAG_MSG 7 /* message */

#define CEPH_MSGR_TAG_ACK 8 /* message ack */

#define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */

#define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */

#define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */

#define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */

#define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */

(7)连接协商

struct ceph_msg_connect {//连接消息结构体

__le64 features; /* supported feature bits */支持的特征位

_le32 host_type; /* CEPH_ENTITY_TYPE* /*上面提到的实体类型

__le32 global_seq; /* count connections initiated by this host */主机初始化的连接数量

__le32 connect_seq; /* count connections initiated in this session */在这个对话中的连接数量

__le32 protocol_version;//协议版本

__le32 authorizer_protocol;//权限协议

__le32 authorizer_len;//权限长度

_u8 flags; /* CEPH_MSG_CONNECT* /*

} attribute ((packed));

struct ceph_msg_connect_reply {//连接回复消息结构体

__u8 tag;//上面定义传递消息的类型

__le64 features; /* feature bits for this session */这个对话的特征位

__le32 global_seq;

__le32 connect_seq;

__le32 protocol_version;

__le32 authorizer_len;

__u8 flags;

} attribute ((packed));

#define CEPH_MSG_CONNECT_LOSSY 1 /* messages i send may be safely dropped */

(8)消息头部结构:有老的

struct ceph_msg_header {

__le64 seq; /* message seq# for this session */

__le64 tid; /* transaction id */

__le16 type; /* message type */

__le16 priority; /* priority. higher value == higher priority */

__le16 version; /* version of message encoding */

__le32 front_len; /* bytes in main payload */

__le32 middle_len;/* bytes in middle payload */

__le32 data_len; /* bytes of data payload */

__le16 data_off; /* sender: include full offset;

receiver: mask against ~PAGE_MASK */

struct ceph_entity_name src;

* oldest code we think can decode this. unknown if zero. */

__le16 compat_version;

__le16 reserved;

__le32 crc; /* header crc32c */

} attribute ((packed));

#define CEPH_MSG_PRIO_LOW 64

#define CEPH_MSG_PRIO_DEFAULT 127

#define CEPH_MSG_PRIO_HIGH 196

#define CEPH_MSG_PRIO_HIGHEST 255

(9)数据有效载荷结构

struct ceph_msg_footer {

__le32 front_crc, middle_crc, data_crc;

__u8 flags;

} attribute ((packed));

#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */

#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */

MSGR2协议

定义:

client(C):发起(TCP)连接的一方

server(S):接受(TCP)连接的一方

connection:两个进程之间的(TCP)连接的实例。

entity:ceph实体实例,例如“ osd.0”。每个实体凭借“ nonce”字段(通常是pid或随机值)具有一个或多个唯一的entity_addr_t。

session:两个实体之间的有状态会话,其中消息交换是有序的,并且是无损的。如果存在中断(TCP连接断开连接),则会话可能跨越多个连接。

frame:在对等体之间发送的离散消息。每个帧都包含一个标签(类型代码),有效负载以及(如果启用了签名或加密)其他一些字段。有关结构,请参见下文。

tag:与框架关联的类型代码。标签确定有效载荷的结构。

阶段:

连接具有四个不同的阶段:

1.Banner

2.认证帧交换

3.消息流握手帧交换

4.消息帧交换

BANNER:

客户端和服务器在连接后均会发送

"ceph %x %xn",protocol_features_suppored, protocol_features_required

初始化为“ceph 0 0n”.

帧格式:

发送或接收的所有其他数据都包含在一个帧中。每个框架具有以下形式:

frame_len (le32)

tag (TAG_* le32)

frame_header_checksum (le32)

payload

[payload padding -- only present after stream auth phase]

[signature -- only present after stream auth phase]

frame_header_checksum仅在frame_len和tag(8个字节)上。

frame_len包括frame_len le32之后到帧末尾的所有内容(所有 payloads, signatures,和填充(padding))。

payload 的格式和长度由tag确定。

仅当身份验证阶段已完成(TAG_AUTH_DONE已发送)并且启用了签名时,signatures 部分才存在。

Last modification:October 20th, 2019 at 12:01 pm
如果觉得我的文章对你有用,请随意赞赏