分布式服务框架:用于实现服务复用,由一个RPC框架,注册中心,监控中心组成。大体会部署在应用层和数据层之间。其中RPC框架是最重要的部分,由服务端和客户端两部分组成,服务端暴露服务,会将服务信息注册到注册中心,客户端调用服务,会从注册中心获取服务信息,然后与服务端进行交互。实现一个高并发、高可用、高性能的的RPC框架涉及很多技术点。
下面是一个简单的RPC框架showcase,没有涉及注册中心,监控中心,服务端启动通过export方法暴露服务,客户端启动通过refer方法引用服务。
//暴露服务:
package com.mcg.rpc.server;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
public class Provider
{
public static void export(final Object instance, int port) throws Exception
{
if(instance==null)
throw new IllegalArgumentException("service is null");
if (port > 65535)
throw new IllegalArgumentException(port + "is invalid");
ServerSocket serverSocket = new ServerSocket(port);
for(;;) {
try {
final Socket socket = serverSocket.accept();
new Thread(new Runnable() {
public void run() {
try {
try {
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
try {
String methodName = objectInputStream.readUTF();
Class<?>[] parameterTypes = (Class<?>[])objectInputStream.readObject();
Object[] arguments = (Object[])objectInputStream.readObject();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
try {
Method method = instance.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(instance, arguments);
objectOutputStream.writeObject(result);
} catch (Throwable t) {
objectOutputStream.writeObject(t);
} finally {
objectOutputStream.close();
}
} finally {
objectInputStream.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//引用服务:
package com.mcg.rpc.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class Consumer {
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> serviceInterface,final String host,final int port)
{
return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(),new Class<?>[]{serviceInterface}, new InvocationHandler()
{
public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}
}
//服务接口:
package com.mcg.rpc.service;
public interface HelloService
{
void say(String name);
}
//服务实现:
package com.mcg.rpc.service;
public class HelloServiceImpl implements HelloService
{
public void say(String name)
{
System.out.println("hello:" + name);
}
}
//测试:
package com.mcg.rpc.server;
import com.mcg.rpc.service.HelloService;
import com.mcg.rpc.service.HelloServiceImpl;
public class ServerTest
{
private final static int PORT=1234;
public static void main(String args []) throws Exception
{
HelloService instance=new HelloServiceImpl();
Provider.export(instance, PORT);
}
}
package com.mcg.rpc.client;
import com.mcg.rpc.service.HelloService;
public class ClientTest
{
private final static int PORT=1234;
private final static String HOST="127.0.0.1";
public static void main(String args [])
{
HelloService helloService=Consumer.refer(HelloService.class, HOST, PORT);
helloService.say("mcg");
}
}
//结果:
hello:mcg