什么zookeeper?
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
ZooKeeper包含一个简单的原语集, 提供Java和C的接口。
ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有java和C两个版本,选举只有Java版本。
zookeeper的原理:
ZooKeeper是以Fast Paxos算法为基础的,Paxos算法存在活锁的问题,即当有多个proposer交错提交时,有可能互相排斥导致没有一个proposer能提交成功,而Fast Paxos作了一些优化,通过选举产生一个leader (领导者),只有leader才能提交proposer,具体算法可见Fast Paxos。因此,要想弄懂ZooKeeper首先得对Fast Paxos有所了解。 [3]
ZooKeeper的基本运转流程:
1、选举Leader。
2、同步数据。
3、选举Leader过程中算法有很多,但要达到的选举标准是一致的。
4、Leader要具有最高的执行ID,类似root权限。
5、集群中大多数的机器得到响应并接受选出的Leader。
为什么zookeeper适合作为注册中心?
Zookeeper的数据模型很简单,有一系列被称为ZNode的数据节点组成,与传统的磁盘文件系统不同的是,zk将全量数据存储在内存中,可谓是高性能,而且支持集群,可谓高可用,另外支持事件监听。这些特点决定了zk特别适合作为注册中心(数据发布/订阅)。
下面介绍两种zookeeper客户端的实现,第一种使用zookeeper自带的原生客户端,第二种使用Apache Curator封装后的zookeeper客户端,第一种接近zookeeper底层的源码,它底层也是用了这些方法,用户使用起来较繁琐,推荐使用第二种,Apache Curator封装后简化了用户的使用。
maven引入:
org.apache.zookeeper zookeeper 3.4.6 org.apache.curator curator-framework 2.9.0
先介绍第一种zookeeper自带的原生客户端:
package com.wenbing.zookeeper;import org.apache.zookeeper.*;//原生zookeeper客户端使用public class zookeeperSelfTest { private static final String connectString = "192.168.159.128:2181,192.168.159.133:2181,192.168.159.134:2181"; private static final int sessionTimeout = 3000; public static void main(String[] args) throws Exception {// 创建一个与服务器的连接,需要(服务器的ip+端口)(session过期时间)(Watcher监听注册) ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { // 监听所有被触发的事件 @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听来了:" + watchedEvent.toString()); } }); System.out.println("OK!");// 创建一个目录节点 /** * CreateMode: * PERSISTENT(持续的,相对于EPHEMERAL,不会随着client的断开而消失 * PERSISTENT_SEQUENTIAL(持久的且带顺序的) * EPHEMERAL (短暂的,生命周期依赖于client session) * EPHEMERAL_SEQUENTIAL (短暂的,带顺序的) */ if (zk.exists("/test01", false) == null) { zk.create("/test01", "goodboy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }// 创建一个子目录节点 zk.create("/test01/test01", "goodgirl".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(zk.getData("/test01", false,null).toString());// 取出子目录节点列表 System.out.println(zk.getChildren("/test01", true));// 创建另一个子目录节点 zk.create("/test01/test02", "goodgirl2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(zk.getChildren("/test01", true));// 修改子目录节点数据 zk.setData("/test01/test01", "goodboy/boy02".getBytes(), -1); byte[] datas = zk.getData("/test01/test01", false, null); String str = new String(datas, "UTF-8"); System.out.println(str);// 删除整个子目录 -1代表version版本号,-1是删除所有版本 zk.delete("/test01/test01", -1); zk.delete("/test01/test02", -1); zk.delete("/test01", -1); System.out.println(str); Thread.sleep(15000); zk.close(); System.out.println("OK!结束!"); }}
Apache Curator封装的zookeeper客户端使用:
package com.wenbing.zookeeper;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorWatcher;import org.apache.curator.retry.RetryNTimes;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.ZooDefs;import java.util.List;//Apache Curator封装的zookeeperk客户端使用public class CuratorTest {// psvm快捷键main方法生成 public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.159.128:2181", new RetryNTimes(10, 5000));// 连接 client.start();// 获取子节点,顺便监控子节点 Listchildren = client.getChildren().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { System.out.println("监控:"+watchedEvent); } }).forPath("/"); System.out.println(children);// 创建节点 String result = client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test", "Data".getBytes()); System.out.println(result);// 设置节点数据 client.setData().forPath("/test", "111".getBytes()); client.setData().forPath("/test", "222".getBytes());// 删除节点 System.out.println(client.checkExists().forPath("/test"));// client.delete().withVersion(-1).forPath("/test"); System.out.println(client.checkExists().forPath("/test")); Thread thread = new Thread(new Runnable(){ @Override public void run(){ } }); thread.sleep(Long.MAX_VALUE); client.close(); System.out.println("OK!"); }}