快捷搜索:   nginx

站内短信等web2.0应用的百万级消息机制简单实现


     SNS,微博等web2.0站点形如校内,新浪微博等,其最大的应用就在于动态分发,小纸条,打招呼等,比如当你发布一条动态时你的所有好友或关注者都能看到你的动态。特别是微博,每日的动态更是几千万条,上亿条(具体有多少,我也没有估算过)。前几天,新浪微博还挂过一天,主要原因就是服务器压力过大。由于动态分发更新频繁,数据量小而多,涉及到频繁的写操作和读操作,所以是整个类SNS应用网站的一个核心模块,也是压力最大的模块。国内比较出名的SNS一个就是校内,一个就是针对中小网站用户的UC home,当然也还有其他实现,微薄也是SNS的一种变体。当数据量大的时候,特别是动态更新频繁的场景,那么无疑对服务器产生的压力是很大的。
     一般的,类SNS的动态机制是这么实现的。当你发布一条动态的时候,首先给自己写入,插入feed表,然后判断当前用户组,如果是普通好友的话,则给所有好友发布广播,往feedbroadcast表写入冗余数据。如果是公共主页的话,则给好友和关注者发送广播。这样存在的一个问题,就是这种同步的“拉”机制,会频繁不间断的读写数据库,给服务器造成很大压力。而好友动态本身又不必要十分高的实时要求。所以,这里,我们将采用异步的推,把动态推给好友,而不是好友主动拉取。
   随着网站用户数量的增加,在前端页面直接写入大量数据,延长了用户的等待时间,在多并发的情况下,页面效率低,压力集中。为了解决上述问题,考虑使用JMS来解决此问题,实现数据的异步处理。而消息队列的背后实质就是一种“异步处理”的架构思想。本文只是一种简单的模拟。
      “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。


   消息的发送
 消息发送很简单,在动态产生之后,直接在DB中插入一条记录即可。
    消息的接收
通过定时的轮询机制,消息接收者从消息队列表中获取对应topic类型的,“未处理”的消息。
接收者从已获取到的消息列表中取出下一条消息,进行业务逻辑处理,成功后将该消息标记为“已处理”。通过这种简单的ACK机制可以保证消息保证消息不会丢失或者被遗漏处理。

如图所示。我们目前要模拟的就是这个过程。对此,我们需要在数据库中三个存储过程,一个定时器来实现模拟。
Proc_msg_receiver_friend    
接收并处理好友的动态分发    从消息队列中获取消息
业务处理:给好友发动态

Proc_msg_receiver_vip    
接收并处理公共主页的动态分发    从消息队列中获取消息
业务处理:给关注者发消息

msg_scheduler    
调度程序    
根据服务器访问的高峰期不同,调度消息接收者从消息队列中获取消息

Proc_msg_cleaner    
消息删除程序    
定期删除消息队列中已经接收的数据。


还需要一个消息对列表,其结构如下:
DROP TABLE IF EXISTS `eventqueue`;
CREATE TABLE IF NOT EXISTS `eventqueue` (
  `qid` int(11) NOT NULL AUTO_INCREMENT COMMENT '消息序列',
  `topic` tinyint(4) NOT NULL COMMENT '应用类别',
  `status` tinyint(4) NOT NULL COMMENT '状态,标识是否进行分发,0未分发,1已分发',
  `data` varchar(1024) COLLATE gbk_bin DEFAULT NULL COMMENT '消息内容',
  `uid` int(11) NOT NULL COMMENT '动态产生者的uid',
  `create_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '发送时间',
  PRIMARY KEY (`qid`)
) ENGINE=MyISAM DEFAULT CHARSET=gbk COLLATE=gbk_bin COMMENT='消息队列表,用以记录消息的属性及内容';
至于feed表,feedbroadcast,friend等相关表不做概述。
这里是消息清除定时器
DROP EVENT IF EXISTS `event_msg_cleaner`;
DELIMITER //
CREATE EVENT `event_msg_cleaner` ON SCHEDULE EVERY 1 HOUR STARTS '2010-12-01 00:00:00' ON COMPLETION NOT PRESERVE ENABLE COMMENT '删除已处理消息调度' DO DELETE FROM ens_eventqueue WHERE STATUS=1//
DELIMITER ;
接下来,再来看一个好友动态分发的存储过程
DROP PROCEDURE IF EXISTS `proc_msg_receiver_friend`;
DELIMITER //
CREATE DEFINER=`TEST`@`%` PROCEDURE `proc_msg_receiver_friend`()
    COMMENT '消息队列中好友分发存储过程'
BEGIN
  DECLARE CONTINUE HANDLER FOR SQLEXCEPTION  ROLLBACK;    
  START TRANSACTION;
  -- 好友动态分发
  INSERT INTO feed_broadcast
        SELECT feed_id, ef.fri_uid, temp.create_date, app_id, src_type
          FROM (SELECT *
                   FROM eventqueue eq
                  WHERE eq.status=0 AND eq.topic=1
                  ORDER BY create_date DESC LIMIT 100) temp,
             friend ef
         WHERE temp.uid = ef.uid;
    -- 更改动态消息状态
    UPDATE eventqueue eq SET status=1
     WHERE eq.status=0 AND eq.topic=1
      ORDER BY create_date DESC LIMIT 100;
        
    COMMIT;
END//
DELIMITER ;
   看到这里,你会觉得这个思路其实和常规实现也没多大区别,是的。确实是区别不大。只不过把页面上的拉取操作变为服务器上的推操作。但是,还有一个调用机制没有提到。就是我怎么去处理消息队列,何时处理,处理多少,当服务器高峰时怎么处理。现在就到了最关键的一步了,那就是消息调度程序,也是一个作业程序。对于这个实现,通常有JMS,Quartz,FIFO,redis等多种实现方式。这些方式都可支持java和php的实现。为了简化实现和易于迁移,我们采用了Quartz来管理作业调度。当服务器高峰时,放缓对列表的调度,空闲时,把累计的队列进行处理。
总结一下大致流程如下:
(1)存储过程Proc_msg_receiver_friend从eventqueue表选择未处理的状态,联合friend表的接收者,对发送者的每一个好友插入冗余表feed_broadcast,然后处理eventqueue消息队列表,更改动态消息状态为已分发。
   (2)调用事件event_msg_cleaner每一个小时执行一次,删除队列表中已处理消息调度。
   (3)使用作业管理系统定时调用存储过程来分发。
   (4)动态不是实时的,而是由服务器决定是否分发给接收者,动态全部进入消息队列,由调度程序决定是否分发和处理。
本方案优点:部署简单,前端代码几乎不用修改,仅使用mysql的内置存储过程和定时器即可完成大部分调度,服务器部署也很简单,容易迁移于多种语言之间。
缺点:对于大数据量,上千万的消息队列,本实现也就力不从心,这只是个暂时的替代方案,对于日百万级别下的应用经测试完全可以负担,数据量再大的话,就需要考虑更复杂的方案了。
本方案实时性不高,不能用于对实时性要求很高的场所,如微博。对于动态分发,邮件,小纸条,短信等实时性相对略低的应用可以采用。
 

顶(0)
踩(0)

您可能还会对下面的文章感兴趣:

最新评论