nodejs+redis写的订阅分发(已抛弃版本)

2013-05-13
下面是我前段时间用nodejs监听redis的pub/sub写的订阅脚本
后来由于持久化及系统资源耗费严重(cpu 50% 内存500m)抛弃掉了,放这里给大家围观下
如果有好办法请告诉我,谢过</p>

设计思路如下
1.当有人在redis下发布一个消息的同时发布个广播
2.分发中转程序收到广播后将消息复制成多份扔到对应订阅者的接受队列内,并对各个队列发送订阅广播
3.redis下的订阅端就会接收到事件,并把事件用http发送一个通知接口让对方来取

中间分发用的开源的qdis后来想慢慢替换掉一直没来得及替换。

var redis = require(“redis”);
var subscription_client = redis.createClient();
var regular_client = redis.createClient();
var http=require(“http”);
//var querystring=require(“querystring”);
var url=require(“url”);

//current send process count
var currentSend=0;
var starttime = process.hrtime();
var configlistobj={
‘pub’:{
‘sub1′:’http://xxxx.com/test.php?p=123’,
‘sub2′:’http://xxxx.com/test.php?p=456’
}
};
//time report
setInterval(function () {
var diff = process.hrtime(starttime);
console.log(‘benchmark took %d seconds and %d nanoseconds’,diff[0],diff[1]);}, 1000);

//send http request
function httprequest(urlpath,dataString)
{
currentSend++;
//parse theurl
urlpath =url.parse(urlpath);
//defaultport is 80
if(urlpath[“port”]==””)
{
urlpath[“port”]=80;
}

var headers= {
‘content-type’: ‘application/x-www-form-urlencoded’,
‘Content-Length’: dataString.length
};

var options= {
host: urlpath[“hostname”],
port: urlpath[“port”],
path: urlpath[“path”],
method: ‘POST’,
headers: headers
};

//console.log(JSON.stringify(options));

var req =http.request(options, function(res) {
//console.log(‘STATUS: ‘ + res.statusCode);
//console.log(‘HEADERS: ‘ + JSON.stringify(res.headers));
if(res.statusCode!=200)
{
console.log(“fuck,get error”);
return;
}
res.setEncoding(‘utf8’);
res.on(‘data’, function (chunk) {
console.log(‘BODY: ‘ + chunk);
currentSend–;
console.log(‘Children:’+currentSend);

});
});

// errorhandle
req.on(‘error’,function(e){
console.log(‘error’);
});
;
req.setTimeout(1,function(socket){});
req.write(dataString);
//console.log(dataString);
req.end();
}

function processOfflineData(channel)
{
regular_client.rpop( channel, function(err,res){
if(res != null)
{
httprequest(configlistobj[“pub”][“sub1”],res);
processOfflineData(channel);
}
});
}

subscription_client.on( ‘message’, function( channel, message ){

//thechannel name is the same name as the list
regular_client.rpop( channel, function(err,res){
httprequest(configlistobj[“pub”][“sub1”],res);
console.log(currentSend);
});

} );

console.log(“sending the Offline info”);
processOfflineData(“sub”);
//start process
subscription_client.subscribe(‘sub’);