pubsub-vs-observer

面试官问,发布订阅和观察者有啥区别?

一脸懵逼?仔细想想,不难。且听娓娓道来。

什么是发布订阅

  • 你想知道某事发生,就需要实时监控着,时不时去看看,到底发没发生?
  • 但这很傻,到底应该多久看一次呢?而且总去看也不是事儿,所以你问我能不能主动告诉你呢?
  • 答案终于呼之欲出:能啊,你订阅我,我就给你发布!

每个人都用过!

  • js 的事件模型其实就是发布订阅!事件都是不一定啥时发生的,所以我们想要监控事件的发生,就要提前写好相应的回调,相当于订阅了这个事件的发生。而 js 引擎就负责实现其中逻辑。最终事件由触发者来触发,这里可以是真实事件,也可以是代码模拟出的。所以发布订阅也可称为自定义事件。

最简单的发布订阅

  • A 想知道 B 是否发生,很简单,A 订阅 B,B 发布即可。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// B为发布者
const B = {
cbList: [], // 需把所有订阅者想做的事都存起来
onSub(cb) {
// 被订阅时的动作
this.cbList.push(cb);
},
pub() {
// 发布时依次执行列表里的回调
for (let index = 0; index < this.cbList.length; index++) {
const cb = this.cbList[index];
cb.apply(this, arguments);
}
},
};
// A为订阅者,诉求很简单,告诉我!
const A = {
tellMe(msg) {
console.log("tellMe", msg);
},
tellMeAgain(msg) {
console.log("tellMeAgain", msg);
},
};
B.onSub(A.tellMe); // A订阅了B
B.onSub(A.tellMeAgain); // A又订阅了B
B.pub("我发生了");

两个小问题

  • 现在 A 只知道 B 发生了,其他关于 B 的事情一概不知。假设还想知道 B 结束了,咋办?
  • A 一旦订阅了 B,再想反悔不想订阅了,好像做不到。
  • 所以,上述最简单的例子可以再改改,加个订阅的类型,以及支持取消订阅。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// B为发布者
const B = {
cbObj: {}, // 需把所有订阅者想做的事都存起来
onSub(type, cb) {
// 被订阅时的动作
if (!this.cbObj[type]) {
this.cbObj[type] = [];
}
this.cbObj[type].push(cb);
},
onCancelSub(type, cb) {
if (!this.cbObj[type]) {
return false;
}
for (let len = this.cbObj[type].length; len > 0; len--) {
const fn = this.cbObj[type][len - 1];
if (fn === cb) {
this.cbObj[type].splice(len - 1, 1);
}
}
},
pub(...rest) {
// 发布时依次执行列表里的回调
const type = rest.shift();
for (let index = 0; index < this.cbObj[type].length; index++) {
const cb = this.cbObj[type][index];
cb.apply(this, rest);
}
},
};
// A为订阅者,诉求很简单,告诉我!
const A = {
tellMe(msg) {
console.log("tellMe", msg);
},
tellMeAgain(msg) {
console.log("tellMeAgain", msg);
},
};
B.onSub("start", A.tellMe); // A订阅了B
B.onCancelSub("start", A.tellMe); // A取消订阅了B
B.onSub("over", A.tellMeAgain); // A又订阅了B
B.pub("start", "我发生了");
B.pub("over", "我结束了");
  • 至此,其实已经实现了一般意义上的观察者模式,发现了吗?最简单的发布订阅就是观察者模式!
  • 那标准意义上的发布订阅还有什么区别呢?答案马上揭晓
  • 上述的例子有点死板,因为无论谁订阅,订阅谁,总是要直接调用相关对象的方法,这就形成了深耦合,不利于扩展,所以能否做一些统一的封装工作,帮助我们管理这些订阅发布的动作?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// B改为通用的发布者
const PUB = {
cbObj: {}, // 需把所有订阅者想做的事都存起来
onSub(type, cb) {
// 被订阅时的动作
if (!this.cbObj[type]) {
this.cbObj[type] = [];
}
this.cbObj[type].push(cb);
},
onCancelSub(type, cb) {
if (!this.cbObj[type]) {
return false;
}
for (let len = this.cbObj[type].length; len > 0; len--) {
const fn = this.cbObj[type][len - 1];
if (fn === cb) {
this.cbObj[type].splice(len - 1, 1);
}
}
},
pub(...rest) {
// 发布时依次执行列表里的回调
const type = rest.shift();
for (let index = 0; index < this.cbObj[type].length; index++) {
const cb = this.cbObj[type][index];
cb.apply(this, rest);
}
},
};
// 部署通用版发布
var deployPub = function (obj) {
for (var i in PUB) {
obj[i] = PUB[i];
}
};
const B = {};
deployPub(B); //这样B就具有了发布功能
// 继续用A测试
const A = {
tellMe(msg) {
console.log("tellMe", msg);
},
tellMeAgain(msg) {
console.log("tellMeAgain", msg);
},
};
B.onSub("start", A.tellMe); // A订阅了B
B.onCancelSub("start", A.tellMe); // A取消订阅了B
B.onSub("over", A.tellMeAgain); // A又订阅了B
B.pub("start", "我发生了");
B.pub("over", "我结束了");
  • 再仔细观察一下,是不是貌似不需要把 B 每次都变成所谓的通用发布者,因为 A 订阅时,已经不关注订阅的对象是谁了,而只关注订阅的事件类型。
  • 所以,可以进一步理解为发布订阅模式就是这样一种模式,即通过“中间人”的角色,完成所有类型事件的订阅。这个中间人只负责收集订阅者,和向所有订阅者发布消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// PUB即为中间人
const PUB = {
cbObj: {}, // 需把所有订阅者想做的事都存起来
onSub(type, cb) {
// 被订阅时的动作
if (!this.cbObj[type]) {
this.cbObj[type] = [];
}
this.cbObj[type].push(cb);
},
onCancelSub(type, cb) {
if (!this.cbObj[type]) {
return false;
}
for (let len = this.cbObj[type].length; len > 0; len--) {
const fn = this.cbObj[type][len - 1];
if (fn === cb) {
this.cbObj[type].splice(len - 1, 1);
}
}
},
pub(...rest) {
// 发布时依次执行列表里的回调
const type = rest.shift();
for (let index = 0; index < this.cbObj[type].length; index++) {
const cb = this.cbObj[type][index];
cb.apply(this, rest);
}
},
};
// 继续用A测试
const A = {
tellMe(msg) {
console.log("tellMe", msg);
},
tellMeAgain(msg) {
console.log("tellMeAgain", msg);
},
};
PUB.onSub("start", A.tellMe); // A订阅了start
PUB.onCancelSub("start", A.tellMe); // A取消订阅了start
PUB.onSub("over", A.tellMeAgain); // A又订阅了over
PUB.pub("start", "我发生了");
PUB.pub("over", "我结束了");
  • 其实还可以再加上更高级的一些功能,例如支持离线消息,即发布时还没订阅,订阅晚于发布时间;还有支持命名空间等,因为总是把所有类型的事件放在一起,用一个字符串表示,难免最后会乱。最后是终极代码,直接照抄了原书作者的版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
var Event = (function () {
var global = this,
Event,
_default = "default";
Event = (function () {
var _listen,
_trigger,
_remove,
_slice = Array.prototype.slice,
_shift = Array.prototype.shift,
_unshift = Array.prototype.unshift,
namespaceCache = {},
_create,
find,
each = function (ary, fn) {
var ret;
for (var i = 0, l = ary.length; i < l; i++) {
var n = ary[i];
ret = fn.call(n, i, n);
}
return ret;
};
_listen = function (key, fn, cache) {
if (!cache[key]) {
cache[key] = [];
}
cache[key].push(fn);
};
_remove = function (key, cache, fn) {
if (cache[key]) {
if (fn) {
for (var i = cache[key].length; i >= 0; i--) {
if (cache[key][i] === fn) {
cache[key].splice(i, 1);
}
}
} else {
cache[key] = [];
}
}
};
_trigger = function () {
var cache = _shift.call(arguments),
key = _shift.call(arguments),
args = arguments,
_self = this,
ret,
stack = cache[key];
if (!stack || !stack.length) {
return;
}
return each(stack, function () {
return this.apply(_self, args);
});
};
_create = function (namespace) {
var namespace = namespace || _default;
var cache = {},
offlineStack = [], // 离线事件
ret = {
listen: function (key, fn, last) {
_listen(key, fn, cache);
if (offlineStack === null) {
return;
}
if (last === "last") {
offlineStack.length && offlineStack.pop()();
} else {
each(offlineStack, function () {
this();
});
}
offlineStack = null;
},
one: function (key, fn, last) {
_remove(key, cache);
this.listen(key, fn, last);
},
remove: function (key, fn) {
_remove(key, cache, fn);
},
trigger: function () {
var fn,
args,
_self = this;
_unshift.call(arguments, cache);
args = arguments;
fn = function () {
return _trigger.apply(_self, args);
};
if (offlineStack) {
return offlineStack.push(fn);
}
return fn();
},
};
return namespace
? namespaceCache[namespace]
? namespaceCache[namespace]
: (namespaceCache[namespace] = ret)
: ret;
};
return {
create: _create,
one: function (key, fn, last) {
var event = this.create();
event.one(key, fn, last);
},
remove: function (key, fn) {
var event = this.create();
event.remove(key, fn);
},
listen: function (key, fn, last) {
var event = this.create();
event.listen(key, fn, last);
},
trigger: function () {
var event = this.create();
event.trigger.apply(this, arguments);
},
};
})();
return Event;
})();
  • 最后总结一下,观察者模式是这种模式的最初思想,为了实现不同对象间的解耦合。
  • 自此基础上发展出了发布订阅模式,通过中间人的角色,统一来管理所有消息的订阅和发布。
  • 后续的功能,例如离线消息、命名空间等,更是为这一模式更加增光添彩如虎添翼。
  • 总之,这种设计模式的思想非常经典,也应用于很多大型复杂软件之中,掌握这种模式还是非常必要的。

最后补充

  • 其实还有一种写法是,订阅者和发布者的角色不局限于某一对象,即订阅者同时也可发布,发布者也可订阅。
  • 看起来更好理解,更像是字面意思上的观察者,互相订阅,也是互相观察。
  • 这种写法的最简版(观察者模式)就是互相观察,深度耦合;升级版也是使用中间人,统一管理订阅发布行为,从而进化为发布订阅模式。