golang-NSQ讲明白

版本 golang – 1.12.4 nsq-1.1.0.linux-amd64.go1.10.3.tar.gz 什么是NSQ 一句话讲NSQ是一个简单队列,类似于java经常使用的activeMQ或者RocketMQ,一般在同步分离成异步,发送消息和接受消息解耦的地方使用到。 NSQ有以下特性: 支持拓扑的高可用性和避免单点故障(SPOFs)。 更强的消息递交保证 为单次处理绑定着内存的足迹(通过把一些持久话的消息放入磁盘) 对生产者和消费者的配置进行极大的简化 提供直接的升级路径 提升效率 NSQ组成 NSQ由三个组件组成: nsqd 用于接收消息,排队消息,投递消息,我们的客户端(生产者,消费者)主要和它打交道 nsqlookupd 管理nsqd,nsqadmin拓扑信息。 我们的客户端(消费者)询问此组件来发现nsqd等 nsqadmin web UI 查询各种NSQ组件的信息,消息信息 NSQ使用步骤 启动nsqlookupd组件 启动nsqd并向nsqlookupd注册 启动nsqadmin并向nsqlookupd注册 生产者推送一个message到其中一个nsqd,并将此消息设置到一个topic里面 消费者向nsqlookupd询问指定topic的消息,nsqlookupd把有此topic的nsqd地址给到消费者 消费者建立channel和topic之间的订阅关系,通过channel向nsqd获取指定topic里面的消息 nsqd向所有订阅该topic的channel推送message, 然后其中一个消费者可以通过其中一个channel获取该topic的message 注意第4点,生产者为什么没有从nsqlookupd注册中心去寻找可以推送消息的nsqd呢?因为nsq的设计理念是将nsqd本地化,也就是说生产者直接推送消息到local-nsqd。这点和RocketMQ的设计理念不一样,RocketMQ的NameServer和nsqlookupd类似,但是设计上RocketMQ生产者会访问NameServer去寻找可用的MQ推送消息。 启动,注册过程: 生产者,消费者: 这就是nsq一个完整的使用流程,下面分别从客户端和代码两个方面介绍详细怎么使用 客户端使用 启动nsqlookup $ nsqlookupd 在另一个shell启动一个nsqd,并在lookupd注册,注意-broadcast-address一定是消费者可以访问的地址 $ nsqd --lookupd-tcp-address=127.0.0.1:4160 -broadcast-address="x.x.x.x" -tcp-address="0.0.0.0:4150" 启动nsqadmin,并在lookupd注册: $ nsqadmin --lookupd-http-address=127.0.0.1:4161 生产者生产一个message,并创建该消息的topic $ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test' 消费者通过lookupd查找对应的topic的nsq并绑定topic和channel,通过channel接受该topic的message $ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161 生产者生产更多消息 $ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test' $ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test' 可以打开nsaadmin查看所有详情http://127.0.0.1:4171/ ,同时也可以查看/tmp下面接收并写入的message (test.*.log) ...

June 24, 2019

golang实现协程安全的几种方式

版本 golang – 1.12.4 golang协程同步 1.channel - monitor goroutine var deposits = make(chan int) // send amount to deposit var balances = make(chan int) // receive balance func Deposit(amount int) { deposits <- amount } func Balance() int { return <-balances } func teller() { var balance int // balance is confined to teller goroutine for { select { case amount := <-deposits: balance += amount case balances <- balance: } } } func init() { go teller() // start the monitor goroutine } 2.channel - serial confinement ...

May 13, 2019

用golang实现mongodb数据库连接池-高级篇-协程安全

版本 golang – 1.12.4 mongodb – 4.0 go driver – 1.0.0 简介 在上一篇《用golang实现mongodb数据库连接池-基本篇》我们实现了mongodb的golang driver按序使用的基本版,但还需要进一步提升效率和高并发安全。本篇张实现高效率协程安全版。 data race 什么是data race,考虑如下代码: var balance int func Deposit(amount int){ balance = balance + amount} func Balance() int { return balance} //Alice: go func(){ bank.Deposit(200) // A1 fmt.Println("=", bank.Balance()) // A2 }() //Bob go bank.Deposit(100) // B 当alice和bob同时执行如上的操作,最后的存款有几种可能性? 根据直觉会有3种可能: alice first bob first alice/bob/alice 0 0 0 A1 200 B 100 A1 200 A2 “=200” A1 300 B 300 B 300 A2 “=300” A2 “=300” 这个结果最后存款都是剩余300似乎也没什么问题,但是这里还有第4种可能,那就是bob的存款操作发生在A1的balance + amount之后,但是在A1的balance =之前,那么会出现什么? ...

May 11, 2019

用golang实现mongodb数据库连接池-基本篇

版本 golang – 1.12.4 mongodb – 4.0 go driver – 1.0.0 简介 mongodb的数据库driver在官方文档里面明确写明所有的数据库连接需要自己建立和释放,而且建议尽量复用已有的建立,那么也就是说driver里面并未实现连接池的功能。在我们实际应用中就需要自己实现这套数据库连接池提升程序和数据库之间的执行效率。 设计思路 用一个数组来存放数据库连接的指针,并记录每一个指针两个状态: a.是否申请了数据库连接 b.这个连接是否已经给系统在使用中。举个例子就比较好理解了: 申请一个用于存放数据连接的数组,一开始空的什么都没有 程序需要一个数据库连接,连接池把数组第一个位置建立一个数据库连接,并把这个连接的状态置为:a.已申请 b.已给系统 程序使用完释放数据库连接,现在数据库指针状态为:a.已申请 b.未使用 程序需要新申请一个数据库连接,那么就回到了第2的状态。 核心代码 const( MAX_CONNECTION = 10 INITIAL_CONNECTION = 4 AVAILABLE = false USED = true ) /* 代码取了一个巧,用实际存放数据库指针的大小ClientPool.size和mongodata.flag来表示上述a,b两个状态 如果mongodata.flag都为USED,那么需要新申请个数据库连接: size++ clientList: the client pool clientAvailable: the available flag, means the location and available flag in the client pool size: the size of allocated client pool <= MAX_CONNECTION */ type mongodata struct{ client *mongo.Client pos int flag bool } type ClientPool struct{ clientList [MAX_CONNECTION]mongodata size int } //create a new database connection to the pool func (cp *ClientPool) allocateCToPool(pos int) (err error){ cp.clientList[pos].client, err = Dbconnect() if err != nil { utils.Logger.SetPrefix("WARNING ") utils.Logger.Println("allocateCToPool - allocateCToPool failed,position: ", pos, err) return err } cp.clientList[pos].flag = USED cp.clientList[pos].pos = pos return nil } //apply a connection from the pool func (cp *ClientPool) getCToPool(pos int){ cp.clientList[pos].flag = USED } //free a connection back to the pool func (cp *ClientPool) putCBackPool(pos int){ cp.clientList[pos].flag = AVAILABLE } //program apply a database connection func GetClient() (mongoclient *mongodata, err error) { for i:=1; i<cp.size; i++ { if cp.clientList[i].flag == AVAILABLE{ return &cp.clientList[i], nil } } if cp.size < MAX_CONNECTION{ err = cp.allocateCToPool(cp.size) if err != nil { utils.Logger.SetPrefix("WARNING ") utils.Logger.Println("GetClient - DB pooling allocate failed", err) return nil, err } pos := cp.size cp.size++ return &cp.clientList[pos], nil } else { utils.Logger.SetPrefix("WARNING ") utils.Logger.Println("GetClient - DB pooling is fulled") return nil, errors.New("DB pooling is fulled") } } //program release a connection func ReleaseClient(mongoclient *mongodata){ cp.putCBackPool(mongoclient.pos) } 以上就是核心代码实现,但是这个代码有一个问题,就是在高并发下并非协程安全,这个留在下一篇《用golang实现mongodb数据库连接池-高级篇-协程安全》来优化。 ...

May 10, 2019

about quality

The company invited me to do a training crouse about the software’s quality, it’s an old topic every project book talk about it, every quality master discuss it, the cmmi and agile process have such a huge practices about it, what should i do about it? maybe i just list what i thought is important for the quality and what’s wrong with our modern software development process. the passion i think most of us going to the industry is because we love this, we love the feel of change something through the finger.if you have this feeling, then you are good damn of it ...

May 16, 2018