久久99久久人婷婷精品综合_超碰aⅴ人人做人人爽欧美_亚洲电影第三页_日韩欧美一中文字暮专区_波多野结衣的一区二区三区_婷婷在线播放_人人视频精品_国产精品日韩精品欧美精品_亚洲免费黄色_欧美性猛交xxxxxxxx

PulsarIO

這篇文章將為大家詳細講解有關Pulsar IO,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

在濱海新區等地區,都構建了全面的區域性戰略布局,加強發展的系統性、市場前瞻性、產品創新能力,以專注、極致的服務理念,為客戶提供成都網站制作、成都網站建設 網站設計制作按需制作網站,公司網站建設,企業網站建設,成都品牌網站建設,營銷型網站,成都外貿網站建設公司,濱海新區網站建設費用合理。

Apache Pulsar 是業界領先的消息系統。使用消息系統時,一個較為常見的問題就是:將數據移入或移出消息平臺的最佳方法是什么?

當然,用戶可以使用 Pulsar 的 consumer 和 producer API 編寫自定義代碼,來傳輸數據。但除此之外,是否還有其他方法呢?

以下為用戶提出的一些相關問題:

1. 要將數據發布到 Pulsar 或使用 Pulsar 中的數據,我應該在哪里運行相應程序?

2. 要將數據發布到 Pulsar 或使用 Pulsar 中的數據,我應該怎樣運行相應程序? 

用戶之所以會提出這些問題,是因為其他消息/發布-訂閱系統沒有提供有組織且容錯的方式來幫助用戶從外部系統輸入數據或將數據輸出到外部系統,因而用戶需要尋求自定義解決方案并手動運行。

為了解決上述問題并簡化這一過程,我們推出了 Pulsar IO。

Pulsar IO 通過利用現有的 Pulsar Functions 框架來輸入/輸出數據。而 Pulsar Functions 框架的所有優勢(如:容錯性、并行性、彈性、負載平衡、按需更新等)都可以直接被 Pulsar 輸入/輸出數據的應用程序所利用。

而且,我們發現經常會出現這樣的情況,用戶花很大功夫(因為他們不是消息系統方面的專家,可能也不想成為這一領域的專家)去編寫自定義程序,用于從消息傳遞系統訪問數據。

自定義編寫這些應用程序不僅會很困難,而且我們發現,許多用戶在嘗試實現執行相同功能的應用程序時,做了相同的工作。歸根結底,消息系統只是用于移動數據的工具,因此,在設計 Pulsar IO 框架時,我們的主要目標之一就是易用性。

我們希望用戶能夠在不編寫任何代碼,也不用同時成為 Pulsar 和外部系統專家的情況下,可以從外部系統輸入數據或將數據輸出到外部系統。

Pulsar IO 框架是什么樣的?

首先,我們定義兩個應用程序,一個作為 source 將數據輸入到 Pulsar ,另一個作為 sink 從 Pulsar 接收數據。

Pulsar IO

Source 將數據從外部系統導入 Pulsar,而 sink 將數據從 Pulsar 導出到外部系統。具體來看,source 從外部系統讀取數據,并將數據寫入 Pulsar topic,而 sink 從一個或多個 Pulsar topic 讀取數據,并將數據寫入外部系統。

Pulsar IO 框架在現有的 Pulsar functions 框架上運行。單個 source 和 sink 可以像 function 一樣與 Pulsar broker 一起運行,如下圖所示。

Pulsar IO

因此,Pulsar Functions 框架的所有優勢都適用于 Pulsar IO 框架,即 sink 和 source 應用程序。

正如前面提到的,我們的設計目標包括用戶無需編寫任何自定義應用程序,也無需編寫任何代碼就可以將數據移入或移出 Pulsar。

因此,Pulsar IO 框架中有多種內置 source 和 sink(Kafka、Twitter Firehose、Cassandra、Aerospike 等,還會支持更多),用戶只需使用一個命令便可運行。用戶因此可以關注于業務邏輯,而無需擔心實現細節。



如何使用 Pulsar IO


使用 Pulsar IO 框架很容易。用戶可以在命令行界面使用一行簡單的命令啟動內置 source 或 sink。例如,用戶可以用下面的命令來提交 source 到已有的 Pulsar 集群,命令格式如下:

$ ./bin/pulsar-admin source create \      --tenant <tenant> \      --namespace <namespace> \      --name <source-name> \      --destinationTopicName <input-topics> \      --source-type <source-type>

以下示例為運行 twitter firehose source 的命令,用于將 Twitter 中的數據導入 Pulsar:

$ ./bin/pulsar-admin source create \--tenant test \  --namespace ns1 \  --name twitter-source \  --destinationTopicName twitter_data \  --sourceConfigFile examples/twitter.yml \  --source-type twitter

經過以上步驟,用戶即可向 Pulsar 輸入數據,而無需編寫或編譯任何代碼。唯一可能需要的是一個配置文件,用于為該 source 或 sink 指定某些配置。用戶可以通過以下格式的命令向現有的 Pulsar 集群中提交待運行的內置 sink:

$ ./bin/pulsar-admin sink create \     --tenant <tenant> \     --namespace <namespace> \     --name <sink-name> \     --inputs <input-topics> \     --sink-type <sink-type>

以下為運行 Cassandra sink 的示例命令,用于將數據從 Pulsar 導出到 Cassandra:

$ ./bin/pulsar-admin sink create \     --tenant public \     --namespace default \     --name cassandra-test-sink \     --sink-type cassandra \     --sinkConfigFile examples/cassandra-sink.yml \     --inputs test_cassandra

更多關于如何運行 Cassandra source 的信息,參閱快速入門指南:
https://pulsar.apache.org/docs/en/2.1.1-incubating/io-quickstart/

以上命令顯示了如何在“集群”模式下(即作為現有 Pulsar 集群的一部分)運行 source 和 sink。除此之外,還可以在“本地運行”模式下將 source 和 sink 作為獨立進程運行,這一模式會在機器上生成本地進程并且運行 source 或者 sink 的邏輯。

本地運行模式有助于測試和調試,但是,需要用戶自行監控和監督。以下為在本地運行模式下運行 source 的命令示例:

$ ./bin/pulsar-admin sink localrun \  --tenant public \     --namespace default \     --name cassandra-test-sink \     --sink-type cassandra \     --sinkConfigFile examples/cassandra-sink.yml \     --inputs test_cassandra

由于 Pulsar IO 框架在 Pulsar Functions 上運行,因此可以通過更新參數和配置來動態更新 source 或 sink。例如,當希望利用前面提到的 Twitter firehose source 將數據輸入到另一個 Pulsar topic 時,可以執行以下命令:

$ ./bin/pulsar-admin source update \--tenant test \  --namespace ns1 \  --name twitter-source \  --destinationTopicName twitter_data_2 \  --sourceConfigFile examples/twitter.yml \  --source-type twitter

?

也可以使用同樣格式的命令更新 sink。大多數 source 和 sink 的更新都可以在運行時進行配置,從而簡化修改、測試、部署等流程。

如果要自定義實現一個小眾的用例,則可以通過實現一個簡單的界面來創建 source 或 sink。但是,Pulsar IO 的目的是幫助用戶直接使用現有的內置 source 或 sink,而不必自己手動實現 source 或 sink。

???? 實現自定義 source

要創建自定義 source,用戶需要編寫一個實現 source 接口的 Java 類:

public interface Source<T> extends AutoCloseable {/** * Open source with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */    void open(final Map<String, Object> config) throws Exception;    /**     * Reads the next message from source.     * If source does not have any new messages, this call should block.     * @return next message from source.  The return result should never be null     * @throws Exception    */    Record<T> read() throws Exception;}

這是一個 source 實現的簡單示例:

public class TestSource implements Source<Integer> {    private int i = 0;    @Override    public void open(Map<String, Object> config) throws Exception {    }    @Override    public Record<Integer> read() throws Exception {       return () -> i++;    }    @Override    public void close() throws Exception {    }}

在上面的 source 示例中,單調遞增的整數被傳入到 Pulsar。

實現 “Record” 接口的對象需要通過 “read” 方法返回,因為 “Record” 接口包含可用于實現不同消息傳遞語義或保證的字段,例如 exactly-once/effectively-once。在后續文章中,我將詳細討論如何執行此操作。

???? 實現自定義 sink

要創建自定義 sink,用戶需要編寫一個實現 sink 接口的 Java 類:

public interface Sink<T> extends AutoCloseable{    /**    * Open Sink with configuration    *    * @param config initialization config    * @throws Exception IO type exceptions when opening a connector    */   void open(final Map<String, Object> config) throws Exception;   /**    * Write a message to Sink    * @param inputRecordContext Context of value    * @param value value to write to sink    * @throws Exception    */   void write(RecordContext inputRecordContext, T value) throws Exception;}

例如,一個簡單的 sink 實現:

public class TestSink implements Sink<String> {private static final String FILENAME = "/tmp/test-out";private BufferedWriter bw = null;private FileWriter fw = null;@Overridepublic void open(Map<String, Object> config) throws Exception {        File file = new File(FILENAME);// if file doesnt exists, then create itif (!file.exists()) {           file.createNewFile();        }        fw = new FileWriter(file.getAbsoluteFile(), true);        bw = new BufferedWriter(fw);    }@Overridepublic void write(RecordContext inputRecordContext, String value) throws Exception {try {            bw.write(value);            bw.flush();        } catch (IOException e) {throw new RuntimeException(e);        }    }@Overridepublic void close() throws Exception {try {if (bw != null)                bw.close();if (fw != null)                fw.close();        } catch (IOException ex) {            ex.printStackTrace();        }    }}

以上示例說明 sink 如何從 Pulsar 讀取數據并寫入文件。與 source 接口類似,sink 接口中的 “write” 方法有一個 RecordContext 參數。此參數為 sink 提供需要寫入外部系統的值的 context。

RecordContext 參數可用于實現能夠提供不同級別的消息傳遞語義或保證(如:Exactly-once/Effective-once)的 sink。在后續文章中,我們將對此進行更深入的討論。

用戶可以通過類似于運行內置 source 和 sink 的方式來提交自定義 source 和 sink:

$ ./bin/pulsar-admin source create \  --className  <classname> \  --jar <jar-location> \  --tenant <tenant> \  --namespace <namespace> \  --name <source-name> \  --destinationTopicName <output-topic>

命令示例如下:

$ ./bin/pulsar-admin source create \  --className org.apache.pulsar.io.twitter.TwitterFireHose \  --jar \~/application.jar \  --tenant test \  --namespace ns1 \  --name twitter-source \  --destinationTopicName twitter_data

在現有 Pulsar 集群中提交待運行的自定義 sink 的命令格式如下:

$ ./bin/pulsar-admin sink create \--className  <classname> \--jar <jar-location> \--tenant test \--namespace <namespace> \--name <sink-name> \--inputs <input-topics>

命令示例:

 $ ./bin/pulsar-admin sink create \--className  org.apache.pulsar.io.cassandra \--jar \~/application.jar \--tenant test \--namespace ns1 \--name cassandra-sink \--inputs test_topic```



使用 Pulsar IO 框架的優勢


如上所述,Pulsar IO 框架在現有的 Pulsar Functions 框架上運行。Pulsar IO 充分利用了現有的 Pulsar Functions 框架。作為 Pulsar IO 的組成部分,source 和 sink 擁有 Pulsar Functions 的所有優勢:

Pulsar IO

關于Pulsar IO就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

分享標題:PulsarIO
URL網址:http://www.js-pz168.com/article8/ihicip.html

成都網站建設公司_創新互聯,為您提供搜索引擎優化外貿建站手機網站建設移動網站建設Google軟件開發

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

網站優化排名
久久99久久人婷婷精品综合_超碰aⅴ人人做人人爽欧美_亚洲电影第三页_日韩欧美一中文字暮专区_波多野结衣的一区二区三区_婷婷在线播放_人人视频精品_国产精品日韩精品欧美精品_亚洲免费黄色_欧美性猛交xxxxxxxx
一区二区三区在线观看www| 在线观看日韩一区| 欧美日韩一级视频| 久久综合久久鬼色中文字| 亚洲欧美欧美一区二区三区| 日本不卡123| av激情亚洲男人天堂| 青娱乐一区二区| 91精品欧美久久久久久动漫 | 久久久天堂av| 亚洲宅男天堂在线观看无病毒| 国产一区久久久| 国产在线一区二区三区播放| 91国产成人在线| 国产日韩欧美不卡| 日本不卡视频在线| av在线不卡一区| 在线影视一区二区三区| 久久久99精品免费观看不卡| 首页国产欧美久久| 91免费看视频| 在线影院国内精品| 国产精品免费aⅴ片在线观看| 青草国产精品久久久久久| 99re在线观看视频| 精品视频在线免费观看| 国产精品成人免费在线| 国产一区二区三区最好精华液| 国产一级特黄a大片99| 欧美日韩国产经典色站一区二区三区 | 国产一区二区不卡视频在线观看 | 久草热久草热线频97精品| 欧美日韩国产精品成人| 亚洲精品v日韩精品| 成人免费看视频| 亚洲综合视频一区| 国产欧美一区二区精品仙草咪| 美女脱光内衣内裤视频久久影院| 国产 高清 精品 在线 a| 欧美日韩国产综合视频在线观看| 日韩毛片在线免费观看| 粉嫩嫩av羞羞动漫久久久| 一本一道久久a久久精品综合| 国产亚洲综合性久久久影院| 久久精品国产澳门| 欧美日韩一区二区视频在线观看| 精品乱码亚洲一区二区不卡| 日韩激情av在线| 久久婷婷国产综合尤物精品| 欧美r级在线观看| 美女视频一区二区三区| 免费观看成人高| 久久久久久影视| 国产一区二区不卡在线| 亚洲乱码一区二区三区| 国产精品不卡视频| www.日韩在线| 欧美日韩国产综合视频在线观看| 亚洲综合一区二区| 91久久精品国产91久久性色tv| 欧美精品高清视频| 日日夜夜精品视频天天综合网| 精品无人区一区二区三区| 精品国产乱子伦一区| 激情都市一区二区| 亚洲巨乳在线观看| 亚洲精品视频在线观看网站| 97神马电影| 日韩精品中文字幕在线一区| 狠狠色狠狠色综合系列| 在线观看日韩片| 一区二区三区不卡在线观看| 国产精品视频入口| 久久一区二区三区国产精品| 国产伦理精品不卡| 欧美亚洲日本国产| 午夜精品免费在线| 日本精品免费| 亚洲视频你懂的| 国产精品一 二 三| 国产亚洲精品7777| av不卡一区二区三区| 欧美一区二区三区人| 欧美日韩激情在线| 天堂va蜜桃一区二区三区漫画版| 免费在线观看91| 国产精品久久久久久久久图文区| 97久久人人超碰| 91麻豆精品国产91久久久久久久久 | 欧美一级欧美一级在线播放| 精品亚洲aⅴ乱码一区二区三区| 色综合一区二区| 欧美视频在线一区二区三区| 亚洲bt欧美bt精品| 日韩中文一区二区三区| 亚洲男人的天堂网| 久久久综合亚洲91久久98| 国产精品欧美综合在线| 国产成人av一区二区三区| 久久久久久久久久久久久夜| 成人97人人超碰人人99| 欧美电视剧在线观看完整版| 国产99久久久国产精品潘金网站| 91精品国产91热久久久做人人| 狠狠色丁香婷婷综合久久片| 欧美日韩国产综合视频在线观看| 久久99精品国产| 欧美日本免费一区二区三区| 国产一区999| 欧美一级黄色录像| 成av人片一区二区| 精品国产乱码久久久久久蜜臀 | 久久99精品久久久久久久青青日本| 国产精品网站在线观看| 国产精品制服诱惑| 亚洲色图一区二区三区| 欧美精品一区二区三区在线看午夜 | 国产亚洲欧美一区二区 | 国产九色精品| 亚洲色大成网站www久久九九| 欧美精品久久| 亚洲h动漫在线| 色狠狠综合天天综合综合| 另类小说综合欧美亚洲| 欧美久久免费观看| 成人免费黄色大片| 久久九九国产精品| 国产精品一区二区不卡视频| 亚洲欧美激情在线| 亚洲视频sss| 蜜乳av一区二区| 制服丝袜国产精品| 国产精品久久久久影院色老大 | 国产成人午夜视频| 亚洲精品一区在线观看| 国产a一区二区| 亚洲精品高清视频在线观看| 亚洲一区二区三区涩| 麻豆91精品视频| 日韩一级黄色大片| 99热国产免费| 一区二区三区在线观看网站| 制服诱惑一区| 国产精品亚洲一区二区三区在线 | 在线日韩一区二区| 国产ts人妖一区二区| 国产亚洲va综合人人澡精品| 国产九色精品| 亚洲电影中文字幕在线观看| 欧美自拍丝袜亚洲| 成人高清在线视频| 国产精品久久久久久妇女6080| 欧洲亚洲一区| 久久99久久精品| 精品国产免费人成电影在线观看四季 | 夜夜亚洲天天久久| 欧美性受xxxx黑人xyx性爽| 国产成人精品三级麻豆| 国产欧美日韩卡一| 色一情一区二区三区四区| 极品少妇xxxx精品少妇| 久久久久99精品一区| 人禽交欧美网站免费| 麻豆精品在线播放| 久久先锋影音av| 欧美在线视频二区| 国产一区二区三区免费播放| 国产网站一区二区三区| 台湾成人av| 高清视频一区二区| 日韩一区欧美小说| 欧洲一区二区av| 91传媒视频在线观看| 亚洲成在线观看| 日韩写真欧美这视频| 久久96国产精品久久99软件| 美女爽到高潮91| 国产清纯在线一区二区www| 视频一区二区三| 国产a视频精品免费观看| 亚洲色图制服诱惑| 欧美日韩免费视频| 国产精品嫩草在线观看| 日韩电影在线一区二区三区| 精品久久久久久最新网址| 欧美最大成人综合网| 国产成人av一区二区| 亚洲乱码国产乱码精品精小说 | 中文字幕乱码一区二区免费| 中文一区一区三区免费| 99久久国产综合精品麻豆| 亚洲第一精品在线| ww亚洲ww在线观看国产| 亚洲一区二区三区免费观看| 99久久99久久精品国产片果冻 | 国产精品国产自产拍高清av王其| 在线观看亚洲a| 国产一区二区三区四区五区加勒比| 麻豆成人91精品二区三区| 国产精品久久久久久久久久久免费看 |