日志实时分析

最近需要做一些实时分析日志的工作,理所当然,想到了用spark streaming,但是日志怎么收集,如何处理,分析结果如何展示,这些问题让我一脸懵逼。

经过一段时间的学习,最后确定了方案。

利用apache flume进行日志收集,收集的日志导入apache kafkaspark streaming订阅kafka日志流做实时分析,分析结果在apache zeppelin中做实时展示。

日志生成

利用python脚本实时生成测试日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
import random
from datetime import datetime
path = "/Users/cxmokai/Desktop/test.log"
while True:
number = random.randint(1, 100)
now = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
with open(path, 'w') as f:
if number % 2:
f.write("[%s] ODD %d\n" % (now, number))
else:
f.write("[%s] EVEN %d\n" % (now, number))
time.sleep(1)

这段脚本实时生成1到100之间的随机整数,记录该数是奇数还是偶数及其产生的时间。

1
2
3
4
[2016-09-18 15:03:59] EVEN 92
[2016-09-18 15:04:00] EVEN 34
[2016-09-18 15:04:01] EVEN 82
[2016-09-18 15:04:02] EVEN 38

Kafka服务

下载最新的kafka安装包,现在最新的版本是0.10.0.1

下载完后,解压,开箱即用。

在启动kafka之前需要先启动zookeeper,下载的安装包中,自带zookeeper

启动zookeeper

1
2
# 在kafka目录下
bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

1
2
# 在kafka目录下
bin/kafka-server-start.sh config/server.properties

kafka默认启动在9092端口,zookeeper默认启动在2181端口,可以在config目录下的server.propertieszookeeper.properties中修改。

Flume服务

下载最新的flume安装包,最新版本为1.6.0

下载完后,解压。

配置flume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 从我们生成的测试日志文件中获得日志
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /Users/cxmokai/Desktop/test.log
# 通过正则拦截器,在日志的header中添加`topic`字段,将奇数和偶数分为两个不同的`topic`
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = ^.*(ODD|EVEN).*$
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = topic
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# 收集日志到kafka
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = localhost:9092

启动flume

1
bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n a1

Spark Streaming分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 日志类
package logs
import scala.util.Try
case class OddEvenLog(var date: String, var time:String, var kind:String, var number:String) extends Serializable
object OddEvenLog {
private val LOG_ENTRY_PATTERN = """^\[(\S+) (\S+)\] (\S+) (\S+)""".r
def parseFromLogLine(logline: String): Try[OddEvenLog] = {
Try {
val LOG_ENTRY_PATTERN(a, b, c, d) = logline
new OddEvenLog(a, b, c, d)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// 分析
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import logs.OddEvenLog.parseFromLogLine
import org.apache.log4j.Logger
import org.apache.log4j.Level
object LogAnalyzerKafka {
val checkpointPath = "hdfs://localhost:9000/tmp"
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val ssc = StreamingContext.getOrCreate(checkpointPath, createStreamingContext)
// Start the computation
ssc.start()
ssc.awaitTermination()
}
def createStreamingContext(): StreamingContext = {
val conf = new SparkConf()
.setAppName("Logs Analyzer Kafka")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint(checkpointPath)
val brokers = "localhost:9092"
val topics = "ODD,EVEN"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
.map(parseFromLogLine)
.filter(_.isSuccess)
.map(line=>(x(line.get.time, line.get.kind), 1L))
.updateStateByKey(updateRunningSum)
lines.foreachRDD(rdd => {
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val df = rdd.map(x=>Record(x._1._1, x._1._2, x._2)).toDF()
// 通过spark sql访问
df.createOrReplaceTempView("x")
val wordCountsDataFrame =
spark.sql("select * from x")
wordCountsDataFrame.show()
})
ssc
}
// 半小时做一次统计
def x(time: String, kind: String): (String, String) = {
val x = time.split(":")(1).toInt
val y = time.split(":")(0).toInt
if(x > 30) {
(((y+1) % 24) + ":00", kind)
} else {
(y + ":30", kind)
}
}
def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
Some(state.getOrElse(0L) + values.size)
}
case class Record(time: String, kind: String, count: Long)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: org.apache.spark.sql.SparkSession = _
def getInstance(sparkConf: org.apache.spark.SparkConf): org.apache.spark.sql.SparkSession = {
if (instance == null) {
instance = org.apache.spark.sql.SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
}

Zeppelin服务

下载最新的zeppelin安装包,现在最新的版本是0.6.1

下载完后,解压,开箱即用。

启动zeppelin

1
bin/zeppelin-daemon.sh start

默认启动在8080端口,可在conf/zeppelin-site.xml中修改,修改选项为zeppelin.server.port

新建一个notebook,将上述代码贴到notebook中,zeppelin默认有一个spark的环境,但是没有kafka相关的依赖,需要在Dependencies中加入org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0org.apache.kafka:kafka-clients:0.8.2.1org.apache.kafka:kafka_2.11:0.8.2.1

因为我们已经将日志流注册为了spark sqlx表(忽略我起的渣名字),所以我们可以再新建一个notebook,在paragraph中做sql查询。

1
2
%sql
select * from x where kind="ODD"

1
2
%sql
select * from x where kind="EVEN"

zeppelin中,sql查询自动生成图表。

notebook上方有个按钮配置定时任务,将写了sql查询的notebook配置为一分钟执行一次,好了,我们已经完成了一个简单的实时分析demo。