KafkaEagle源码解读

1.概述

在《Kafka 消息监控 - Kafka Eagle》一文中,简单的介绍了 Kafka Eagle这款监控工具的作用,截图预览,以及使用详情。今天笔者通过其源码来解读实现细节。目前该项目已托管于 Github 之上,作者编写了使用手册,告知使用者如何安装,部署,启动该系统。但对于实现的细节并未在参考手册中详细指出。这里,笔者通过本篇博文,来详细解读其实现细节。相关资料文献地址如下所示:

2.内容

截止到版本 Kafka Eagle v1.1.1 支持监控0.8.2.x(存放消费信息于Zookeeper)以及 0.10.x(存放消费信息于Kafka的topic中)。对于前者,从Zookeeper中获取消息信息,难度不大,编写Zookeeper客户端实现代码即可,该版本在Zookeeper下的存储结构树如下图所示:

KafkaEagle源码解读

对于实现细节,可使用ZkUtils工具类来获取相关数据,以获取消费信息为例,代码如下所示:

/** Obtaining kafka consumer information from zookeeper. */
    public Map<String, List<String>> getConsumers(String clusterAlias) {
        ZkClient zkc = zkPool.getZkClient(clusterAlias);
        Map<String, List<String>> consumers = new HashMap<String, List<String>>();
        try {
            Seq<String> subConsumerPaths = ZkUtils.getChildren(zkc, CONSUMERS_PATH);
            List<String> groups = JavaConversions.seqAsJavaList(subConsumerPaths);
            for (String group : groups) {
                String path = CONSUMERS_PATH + "/" + group + "/owners";
                if (ZkUtils.pathExists(zkc, path)) {
                    Seq<String> owners = ZkUtils.getChildren(zkc, path);
                    List<String> ownersSerialize = JavaConversions.seqAsJavaList(owners);
                    consumers.put(group, ownersSerialize);
                } else {
                    LOG.error("Consumer Path[" + path + "] is not exist.");
                }
            }
        } catch (Exception ex) {
            LOG.error(ex.getMessage());
        } finally {
            if (zkc != null) {
                zkPool.release(clusterAlias, zkc);
                zkc = null;
            }
        }
        return consumers;
    }

其他监控信息可以按照Zookeeper中结构树路径获取。如下图所示:

KafkaEagle源码解读

然而,对于新版本,官方默认将消费信息迁移到Kafka的topic中,这样原来的接口只能获取topic,broker等信息,对于消费的信息,我们需要从kafka中一个叫__consumer_offsets的topic中获取。为了兼容0.8.2.x版本的Kafka,这里在Kafka Eagle中另外启动一个RpcServer来贡献__consumer_offsets中的消费信息。消费__consumer_offsets这个topic时,需要指定该内部topic不暴露给consumer,将 exclude.internal.topics 设置为 false 即可。这样我们通过一个 kafka.eagle.offset.storage 开关来控制系统获取监控元数据的走向。获取流程如下图所示:

KafkaEagle源码解读

3.消费 Owner

当消费的信息存放于Zookeeper中,我们可以直接从consumer模块下直接获取对应的Owner,但是在Kafka的Topic中,我们需要编码来间接的获取。这里,我们需要知道 Kafka 的Owner的组成规则,其规则由 Group+ConusmerHostAddress+Timespan+UUID+PartitionId组成,实现细节可参考源码,界面展示如下图所示:

KafkaEagle源码解读

4.Kafka SQL

关于Kafka SQL,旨在使用SQL来快速可视化Topic的相关信息,目前 Kafka SQL 实现的功能包含有展示某一个Topic的Partition,Offset,以及其对应的消息记录,若不加limit条件限制,默认展示该Topic下最新的5000条记录,详细实现细节,可参看源码,预览截图如下所示:

KafkaEagle源码解读

查询结果,如下图所示:

KafkaEagle源码解读

5.多集群

Kafka Eagle 目前是支持多集群监控,所谓多集群,是指多个Zookeeper集群下的Kafka集群,通过切换Session来管理不同的Zookeeper集群下的Kafka集群,细节参看源码。管理界面如下图所示:

KafkaEagle源码解读

6.总结

Kafka Eagle总体实现思路基本如上所述。针对,Kafka 0.10.x版本,Kafka Eagle监控部分模块不展示的问题,这里在启动 Kafka Eagle之前,默认启动一个系统consumer来消费kafka.eagle该group下的__system.topic__,保证__consumer_offsets是有数据可供获取的。

7.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

更多相关文章
  • java5之后的java.util.concurrent包是世界级并发大师Doug Lea的作品,里面主要实现了 atomic包里Integer/Long对应的原子类,主要基于CAS: 一些同步子,包括Lock,CountDownLatch,Semaphore,FutureTask等,这些都是基于A ...
  • java5之后的java.util.concurrent包是世界级并发大师Doug Lea的作品,里面主要实现了 atomic包里Integer/Long对应的原子类,主要基于CAS: 一些同步子,包括Lock,CountDownLatch,Semaphore,FutureTask等,这些都是基于A ...
  • MailOtto实现完美预加载以及源码解读
    背景:     最近项目组需要一个小课题分享,小白刚好从微博里看到一个这样有趣的开源工具MailOtto,是阿里巴巴员工 Drakeet 维护的一个专注懒事件的事件总线,gitHub地址为:https://github.com/drakeet/MailOtto ,Drakeet的个人地址为:http ...
  • Docker网络详解及pipework源码解读与实践
    Docker作为目前最火的轻量级容器技术,有很多令人称道的功能,如Docker的镜像管理.然而,Docker同样有着很多不完善的地方,网络方面就是Docker比较薄弱的部分.因此,我们有必要深入了解Docker的网络知识,以满足更高的网络需求.本文首先介绍了Docker自身的4种网络工作方式,然后通 ...
  • SpringMVC源码解读-RequestMapping注解实现解读-ConsumesRequestCondition
    consumes  指定处理请求的提交内容类型(media-Type),例如application/json, text/html. 所以这边的ConsumesRequestCondition就是通过content type进行url过滤的. 具体分析前,我们先准备下基础知识. 1. MediaTy ...
  • 对于Java项目在运行的时候是如何工作的,这个问题我一直比较模糊,虽然知道是那三种类加载机制(bootstrapClassLoader,extendsionClassLoader和systemAppClassLoader),但具体是怎么实现的呢? Java在加载JVM的时候会先加载jdk的一些环境变 ...
  • Tomcat源码解读系列三——Tomcat对HTTP请求处理的整体流程
        前面的文章中介绍了Tomcat初始化的过程,本文将会介绍Tomcat对HTTP请求的处理的整体流程,更细节的.     在上一篇文章中,介绍到JIoEndpoint 中的内部类Acceptor用来接受Socket请求,并调用processSocket方法来进行请求的处理,所以会从本文这个方法 ...
  • Spring源码解析BeanFactory接口体系解读
    不知道为什么看着Spring的源码,感触最深的是Spring对概念的抽象,所以我就先学接口了,BeanFactory是Spring IOC实现的基础,这边定义了一系列的接口,我们通过这些接口的学习,可以大致了解BeanFactory体系各接口如何分工合作.为学习具体实现打下基础.毕竟这边逻辑复杂,涉 ...
  • 0.打包成可部署的zip:Cloudify ant cloudify.zip <target name="cloudify.zip" depends="prepare.env, cloudify.install, prepare.cloudify.package. ...
一周排行