一个例子
同样是typesafe的经典例子。
例子提供的服务是传输文本。当文本发给frontend节点,它会委派backend节点,backend执行转化任务,把结果返回给原来的客户端。
新的backend节点和frontend节点,都可以动态地在cluster上增减。
message
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface TransformationMessages {
public static class TransformationJob implements Serializable {
private final String text;
//......
}
public static class TransformationResult implements Serializable {
private final String text;
//.....
}
public static class JobFailed implements Serializable {
private final String reason;
private final TransformationJob job;
//....
}
public static final String BACKEND_REGISTRATION = "BackendRegistration";
}
backend处理逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TransformationBackend extends UntypedActor {
Cluster cluster = Cluster.get(getContext().system());
//...
@Override
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new TransformationResult(job.getText().toUpperCase()),
getSelf());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
} else {
unhandled(message);
}
}
void register(Member member) {
if (member.hasRole("frontend"))
getContext().actorSelection(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
}
}
backend订阅了cluster的事件,检测frontend节点,还会发一条消息告诉fontend可以使用了。
frontend节点接收用户的任务,扔给注册好的backend节点。
frontend节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TransformationFrontend extends UntypedActor {
List<ActorRef> backends = new ArrayList<ActorRef>();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
TransformationJob job = (TransformationJob) message;
getSender().tell(
new JobFailed("Service unavailable, try again later", job),
getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
backends.get(jobCounter % backends.size())
.forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
backends.remove(terminated.getActor());
} else {
unhandled(message);
}
}
}
frontend用List保存了backend的actor位置,有需要的时候就轮循发给backend。
getSender 本次收到消息的上游,一般用来回复消息。
getContext 本actor的上下文。
getContext().watch DeathWatch,相当于watch了谁,谁有啥公开动作就会告诉我,包括挂了之类的。
ActorRef.forward与tell、ask的区别,性能最好的是tell,发完就走。ask是发完等Future,要等的话性能是个问题。forward用于从一个actor转发消息给另一个actor,原始的sender信息会被保留,在做路由、负载均衡、备份时非常有用。
运行TransformationApp
sample.cluster.transformation.TransformationApp 启动三个backend 2551 2552 0为一个cluster,启动一个fronend。
frontend每5秒会收到一次任务,接收成功后print代码,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
system.scheduler().schedule(interval, interval, new Runnable() {
public void run() {
ask(frontend,
new TransformationJob("hello-" + counter.incrementAndGet()),
timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
}
}, ec);
frontend节点中,收到job的时候会去检查backend注册数是否可用了,如果有可用的就forward任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
TransformationJob job = (TransformationJob) message;
getSender().tell(
new JobFailed("Service unavailable, try again later", job),
getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
backends.get(jobCounter % backends.size())
.forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
backends.remove(terminated.getActor());
} else {
unhandled(message);
}
}
1
2
3
4
5
void register(Member member) {
if (member.hasRole("frontend"))
getContext().actorSelection(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
}
解析:backend订阅了memberUp事件,所以在cluster中如果有memberUp了,都会执行上述代码。
actorSelection是根据地址进行lookup,返回一个ActorSelection,可以当成本地的actor一样tell。
代码
原创文章如转载,请注明:转载自五四陈科学院 [http://www.54chen.com ]
捐赠说明