對于大數據我相信大多數人都不陌生,特別現在是在大數據時代,當下流行的技術很多人都可以朗朗上口比如hadoop、hbase、spark、storm,那么在數據收集中,是否有好的解決辦法來幫助企業進行安全管理及策略分析呢,筆者搜索了很多資料,通過自己的實踐分享一篇我對大數據日志分析及處理的見解,本篇技術是基于logstash elasticsearch、redis結合ossec來做的開源方案。 在《無處可藏》中斯諾登提到NSA對網絡及數據收集的原則是收集一切,在企業安全上也需要做到如此,眾所周知安全的風險來自于邊界,黑客也會挑一些比較邊緣的業務進行入侵,但這些東西我們都沒記錄但對于后續的應急響應和入侵檢測來說就無從談起,所以在企業內做大數據的原則也是“收集一切”。 開篇提到的這些主要是為了下文做鋪墊,本文基于系統安全日志來做收集及處理,希望能給大家提供一些思路,整體為開源技術,屌絲安全的本質是“快”,當前社會唯快不破。
Ossec(事件源、alert源) Logstash (日志收集、分割日志) Elasticsearch (全文搜索) Kibana3 (日志展現) Redis(用于數據存儲,防止數據丟失)
使用該方案好處:opensource, 不方便處:資料較少,尤其是對Kibana
關于如何安裝,請參考: https://app.yinxiang.com/shard/s21/sh/f4e62686-16ef-4549-beb1-c5124b232df6/f538a1ea304ff4191acf494a1a1bd4f9
1、 系統日志收集:
操作系統日志收集可以采取syslog、rsyslog等技術,本文使用syslog主要對于收集日志安全日志,日志內容范圍為/var/log/secure,、/var/log/lastlog等,/var/log/secure內基于用戶的登錄失敗、登錄異常、是否從白名單ip登錄等都可以審計到,然后來做關聯分析,更多的維度需要各位看官自己去發現和探索,本文不深入講解。
日志收集作用: 應用場景:系統安全日志薄弱在黑客防御過程中,如果日志被刪除,syslog服務被暫停,這些都會對系統入侵分析造成麻煩,難以追溯。
2、 入侵檢測系統ossec:
Syslog日志進來后不能不做分析,如果不分析,數據就不會被用活只是死數據,這里用到開源ids系統,Ossec,Ossec大家或許并不陌生,ossec 支持2種模式:1、ossec agent; 2、基于syslog方式對日志做收集,通過客戶端配置syslog將日志傳送到ossec server,然后ossec server會通過對分析日志進格式化處理規則解析,判斷異常并且對其做處理,比如寫入數據庫,觸發告警,如圖1為處理日志過程。
如圖2為ossec整個工作過程:
3、 日志集中化管理 logstash:
Logstash是一個完全開源的工具,他可以對你的日志進行收集、分析,并將其存儲供以后使用(如,搜索),您可以使用它。說到搜索,logstash帶有一個web界面,搜索和展示所有日志,但因為logstash的管理界面不如kibana美觀,這里使用Kibana做日志展現。 我安裝的logstash版本為:logstash-1.2.2-flatjar.jar,通過命令:
java -jar logstash-1.2.2-flatjar.jar agent -f logstash_agent.conf
將logstash啟動,logstash_agent.conf內容如下圖:
對上圖logstash內容進行分解,它一共做了這些事。
4、 全文檢索 elasticsearch:
ElasticSearch是一個基于Lucene構建的開源,分布式,RESTful搜索引擎。設計用于云計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。 在本文中ElasticSearch主要的作用,通過es讀取logstash內ossec日志進行全文檢索,方便后續日志展現及搜索,es默認端口為9200,成功開啟es后會提示如下:
ElasticSearch啟動后可以通過訪問http://127.0.0.1:9200來查看es工作是否正常,出現如下數據證明es正常工作 9300端口主要用于和es集群通信發數據。
{
"status" : 200,
"name" : "N'Gabthoth",
"version" : {
"number" : "1.2.2",
"build_hash" : "9902f08efc3ad14ce27882b991c4c56b920c9872",
"build_timestamp" : "2014-07-09T12:02:32Z",
"build_snapshot" : false,
"lucene_version" : "4.8"
},
"tagline" : "You Know, for Search"
}
通過讀取之前寫入到redis里的數據,將redis內數據寫入到es里,如下配置實現:
input {
redis
{
host => "127.0.0.1"
data_type =>"list"
port => "6379"
key => "logstash"
type => "ossec"
}
}
output {
stdout { codec => rubydebug }
if [type] == "ossec" {
elasticsearch {
host => "127.0.0.1"
port => "9300"
#cluster => "ossec"
index => "logstash-ossec-%{+YYYY.MM.dd}"
index_type => "ossec"
template_name => "template-ossec"
template => "/usr/local/share/logstash/elasticsearch_template.json"
template_overwrite => true
}
}
}
5、 日志展現:
既然已經將上述工作都做到位了,那么需要有界面來展現勞動成果及后續進行日志分析,這里使用kibana來做展現,關于kibana各位請自行搜索,我只簡單說下我的理解,它的資料很少,筆者在學習過程中也是一路摸索,推薦大家使用這個dashboard來展現日志,下載地址:https://github.com/magenx/Logstash/blob/master/kibana/kibana_dashboard.json 6、 結果展現:
6.1、 結合Ossec應用場景:
舉例,我們在日常的運維環境中會有常見的登錄事件,并且也會限制一些ip是否能登錄該主機,在ossec內我們定制一條規則來判斷是否有入侵者使用其他ip進行登錄。 在ossec里我們可以定制一條規則在sshd_rules.xml中,規則如下:
<rule id="5739" level="10">
<if_sid>5700</if_sid>
<group>authentication_failure</group>
<srcip>!10.10.2.1</srcip>
<description>not come from 10.10.2.1</description>
</rule>
</group>
if_sid是匹配syslog,sshd,如下規則:
<!-- SSHD messages -->
<group name="syslog,sshd,">
<rule id="5700" level="0" noalert="1">
<decoded_as>sshd</decoded_as>
<description>SSHD messages grouped.</description>
</rule>
如果有用戶觸發到該規則,kibana會展現告警如下:
借用一句話,安全這個東西就是這樣的,你遇到的我沒遇到過,我遇到的你可能也沒有遇到過,其實只要一說,大家就都明白了。 ” 希望此文對你有幫助,謝謝!