這篇文章主要講解了Python把Spark數(shù)據(jù)寫入ElasticSearch的方法,內(nèi)容清晰明了,對此有興趣的小伙伴可以學(xué)習(xí)一下,相信大家閱讀完之后會有幫助。

如果使用Scala或Java的話,Spark提供自帶了支持寫入ES的支持庫,但Python不支持。所以首先你需要去這里下載依賴的ES官方開發(fā)的依賴包包。
下載完成后,放在本地目錄,以下面命令方式啟動(dòng)pyspark:
pyspark --jars elasticsearch-hadoop-6.4.1.jar
如果你想pyspark使用Python3,請?jiān)O(shè)置環(huán)境變量:
export PYSPARK_PYTHON=/usr/bin/python3
理解如何寫入ES的關(guān)鍵是要明白,ES是一個(gè)JSON格式的數(shù)據(jù)庫,它有一個(gè)必須的要求。數(shù)據(jù)格式必須采用以下格式
{ "id: { the rest of your json}}
往下會展示如何轉(zhuǎn)換成這種格式。
解析Apache日志文件
我們將Apache的日志文件讀入,構(gòu)建Spark RDD。然后我們寫一個(gè)parse()函數(shù)用正則表達(dá)式處理每條日志,提取我們需要的字
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
本文題目:Python把Spark數(shù)據(jù)寫入ElasticSearch的方法-創(chuàng)新互聯(lián)
瀏覽路徑:http://www.js-pz168.com/article1/ccpgid.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、Google、移動(dòng)網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、品牌網(wǎng)站制作、微信公眾號
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容