`
1998a
  • 浏览: 111580 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

memcached java client 源代码分析

    博客分类:
  • MINA
阅读更多

uniseraph 大牛写了一个用mina实现的memcached java client.最近正好在研究mina,来的早不如来的巧,现在跟大家分享一下我今天一天的劳动成果,望大家指教

该项目的基本思想是:使用一个IoSession池,当前默认是使用SimpleCircularQueue,借助于mina的机制,实现高并发请求的。目前只有set和get功能。现在从set开始分析。

在session建立之后,通过MemcachedClientHandler#sessionCreated(IoSession)在每个session中加入一个List和一个Map,在MemcachedClient中的set方法中构造SetCommand然后使用pool发送。MemcachedMutexIoSessionPool#send()方法可以选择是否使用异步调用,但是我感觉这里的异步调用有些问题,同步当然是没有问题,通过holder.getContext().await()就可以阻塞send方法等待结果。但是如果是异步方式的话,返回null,我感觉不妥,客户端得到null?如果是取得数据的话,是否可以返回一个MemcachedResponse的子类,但是想得到结果时阻塞呢?当然这只是我自己YY的。MemcachedMutexIoSessionPool#send()代码如下。

public MemcachedResponse send(MemcachedCommand message, boolean sync) {
		IoSession session = selectSession();
		RequestContextHolder holder = new RequestContextHolder();
		holder.getContext().setCommand(message);

		if (sync) {
			session.write(holder);
			try {
				holder.getContext().await();
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			if (!holder.getContext().isSuccess()) {
				if (log.isInfoEnabled())
					log.info(String.format("time out"));
			}
			return holder.getContext().getReponse();
		} else {
			session.write(holder);
			return null;
		}
	}
 

调用sessio.write(holder)后,通过await方法阻塞,但是MINA会去调用encoder对消息进行encode,这里使用的是CommandEncoder,代码如下:

public void encode(IoSession session, Object message,
			ProtocolEncoderOutput out) throws Exception {
		RequestContextHolder holder = (RequestContextHolder) message;
		IoBuffer buf = holder.getContext().getCommand().toBuffer(
				MemcachedConstants.DEFAULT_CHARSET);
		buf.setAutoExpand(true);
		buf.flip();
		ConcurrentHashMap<IoBuffer, RequestContext> encodedMessage2Message = (ConcurrentHashMap<IoBuffer, RequestContext>) session
				.getAttribute("encodedMessage2Message");
		RequestContext newItem = encodedMessage2Message.putIfAbsent(buf,
				holder.getContext());

		if (newItem == null) {
			out.write(buf);

		} else {
			if(log.isDebugEnabled()){
				log.debug("reusing command.");
			}
			holder.setContext(newItem);

		}
	}

  首先将command转换成IoBuffer,然后把IoBuffer和ReuestContext对于放入session中的map中。

消息发送完毕后,MINA的filter机制开始工作,AppendFilter中的messageSent方法拦截session,将刚刚session中map存储的RequestContext取出来,放入到session中的队列中。在memcached通讯层分析 中提到过memcache在同一个socket上,命令处理是按顺序的。根据这个就可以按照先后顺序将RequestContext加入到队列中就可以。

AppenderFilter代码:

public void messageSent(NextFilter nextFilter, IoSession session,
			WriteRequest writeRequest) throws Exception {
		Object obj = writeRequest.getMessage();
		if (obj instanceof IoBuffer) {
			IoBuffer buf = (IoBuffer) obj;
			if (buf.limit() != 0) {
				ConcurrentHashMap<IoBuffer, RequestContext> encodedMessage2Message = SessionUtil
						.getEncodedMessage2Message(session);

				LinkedBlockingQueue<RequestContext> queue = SessionUtil
						.getCommandQueue(session);
				RequestContext item = encodedMessage2Message.remove(buf);
				
				if (item == null) {
					log.error("Can't find the command :" + buf + "in the map");
					log.error("map :" + encodedMessage2Message.toString());
					throw new IllegalStateException();
				}
				queue.add(item);

			}
		}
		nextFilter.messageSent(session, writeRequest);
	}

 然后就是response的decode,这里使用LinedReponseDecoder,在decode时,先使用TextlineDecoder首先进行decode,这里有一个hint,就是decode方法需要带一个ProtocolDecoderOutput的参数,其实ProtocolDecoderOutput只是decode方法的一个callback,这里,首先从session中取出先前存储的list,在TextlineDecoder decode完成后,将结果加到context中。具体看代码:

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
			throws Exception {

		final List<String> context = (List<String>) session
				.getAttribute(MemcachedConstants.LINES_CONTEXT);

		decoder.decode(session, in, new ProtocolDecoderOutput() {

			public void flush() {
			/**
			 * ignore
			 */

			}

			public void write(Object message) {
				context.add(message.toString());

			}
		});

 最后decode方法再根据先前的先来先服务原则,解析对于的response就可以了。这部分比较简单了。

 

总结这个项目,重点是借助于mina的处理机制,对mina有很深了解的看这个项目应该不难,主要是思维比较好。

学习中……

 

分享到:
评论

相关推荐

    Memcached-Client源码

    memcached在Java客户端调用时的源码。memcached源码中有一个bug,ip的传参形式为192.168.1.1:12301,到了sockiopool.java中用split(";")来解析ip和端口。这种情况在ipv6的环境下是行不通的。因为v6地址是[xxxx:xx:...

    memcached java client

    java_memcached-release_2.5.2.zip memcache的java客户端分发包,包括源代码和jar包

    JAVA上百实例源码以及开源项目源代码

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    Spring memcached 源码

    下载压缩包,解压查看里面的 OrgManagerServiceImpl 类的方法,已经测试过Spring集成配置Memcached 成功的

    java memcahe 客户端 2.6.1

    这次,Memcached Java Client推出的2.6.1发布版是基于全新的performance分支,具有如下重大改进: 较之老版本,在性能上有300%左右的提升; 兼容老版本,用户无须修改自己的源代码; 支持多个memcached协议,包括...

    基于memcached client for java的cache封装

    NULL 博文链接:https://guazi.iteye.com/blog/1071646

    JAVA上百实例源码以及开源项目

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    Memcached-Java-Client-3.0.2.jar中文-英文对照文档.zip

    源代码下载地址:【***-sources.jar下载地址(官方地址+国内镜像地址).txt】 # 本文件关键字: 中文-英文对照文档,中英对照文档,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,...

    nginx+tomcat+memcached负载均衡集群搭建许jar包大全

    3)msm源码中的lib包版本太低:spymemcached需要使用2.10.2,否则启动tomcat报错:java.lang.NoSuchMethodError:net.spy.memcached.MemcachedClient.set(Ljava/lang/String;ILjava/lang/Object;)Lnet/spy/memcached...

    java开源包8

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包1

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包10

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包4

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包101

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包11

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包6

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包9

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包5

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

    java开源包3

    JOpenID是一个轻量级的OpenID 2.0 Java客户端,仅50KB+(含源代码),允许任何Web网站通过OpenID支持用户直接登录而无需注册,例如Google Account或Yahoo Account。 JActor的文件持久化组件 JFile JFile 是 JActor ...

Global site tag (gtag.js) - Google Analytics