博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据处理框架之Strom:容错机制
阅读量:5101 次
发布时间:2019-06-13

本文共 7442 字,大约阅读时间需要 24 分钟。

1、集群节点宕机

Nimbus服务器
  单点故障,大部分时间是闲置的,在supervisor挂掉时会影响,所以宕机影响不大,重启即可
非Nimbus服务器
  故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行

2、进程挂掉

Worker
  挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上
Supervisor
  无状态(所有的状态信息都存放在Zookeeper中来管理)
  快速失败(每当遇到任何异常情况,都会自动毁灭)
Nimbus
  无状态(所有的状态信息都存放在Zookeeper中来管理)
  快速失败(每当遇到任何异常情况,都会自动毁灭)

3、消息的完整性

从Spout中发出的Tuple,以及基于他所产生Tuple,由这些消息就构成了一棵tuple树,当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性,storm使用Acker确保消息完整性,Acker是拓扑当中特殊的一些任务,负责跟踪每个Spout发出的Tuple的DAG(有向无环图)
Acker分为ack确认机制和fail失败处理机制,Spout作为数据源,当拓扑中bolt处理失败时该怎么办?Acker机制可以重发数据到bolt进行重新处理。

看下面的例子:

MessageSpout  ---->   split-bolt  ---->    write-bolt

MessageTopology
package bhz.topology;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import bhz.bolt.SpliterBolt;import bhz.bolt.WriterBolt;import bhz.spout.MessageSpout;public class MessageTopology {        public static void main(String[] args) throws Exception {        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("spout", new MessageSpout());        builder.setBolt("split-bolt", new SpliterBolt()).shuffleGrouping("spout");        builder.setBolt("write-bolt", new WriterBolt()).shuffleGrouping("split-bolt");        //本地配置        Config config = new Config();        config.setDebug(false);        LocalCluster cluster = new LocalCluster();        System.out.println(cluster);        cluster.submitTopology("message", config, builder.createTopology());        Thread.sleep(10000);        cluster.killTopology("message");        cluster.shutdown();    }}

MessageSpout

package bhz.spout;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class MessageSpout implements IRichSpout {    private static final long serialVersionUID = 1L;    private int index = 0;        private String[] subjects = new String[]{            "groovy,oeacnbase",            "openfire,restful",            "flume,activiti",            "hadoop,hbase",            "spark,sqoop"            };        private SpoutOutputCollector collector;        @Override    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        this.collector = collector;    }        @Override    public void nextTuple() {        if(index < subjects.length){            String sub = subjects[index];            //发送信息参数1 为数值, 参数2为msgId            collector.emit(new Values(sub), index);            index++;        }    }        @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("subjects"));    }    //当bolt 处理成功  ack确认 spout执行ack方法    @Override    public void ack(Object msgId) {        System.out.println("【消息发送成功!!!】 (msgId = " + msgId +")");    }    //当bolt处理失败时,spout调用fail方法,进行重发处理    @Override    public void fail(Object msgId) {        System.out.println("【消息发送失败!!!】  (msgId = " + msgId +")");        System.out.println("【重发进行中...】");        collector.emit(new Values(subjects[(Integer) msgId]), msgId);        System.out.println("【重发成功!!!】");    }    @Override    public void close() {    }    @Override    public void activate() {    }    @Override    public void deactivate() {    }    @Override    public Map
getComponentConfiguration() { return null; }}

 

SpliterBolt

package bhz.bolt;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class SpliterBolt implements IRichBolt {    private static final long serialVersionUID = 1L;    private OutputCollector collector;        @Override    public void prepare(Map config, TopologyContext context, OutputCollector collector) {        this.collector = collector;    }            private boolean flag = false;        @Override    public void execute(Tuple tuple) {        try {            String subjects = tuple.getStringByField("subjects");                        if(!flag && subjects.equals("flume,activiti")){                flag = true;                int a = 1/0;            }                        String[] words = subjects.split(",");            //List
list = new ArrayList
(); //int index = 0; for (String word : words) { //注意这里循环发送消息,要携带tuple对象,用于处理异常时重发策略 collector.emit(tuple, new Values(word)); //list.add(word); //index ++; } //collector.emit(tuple, new Values(list)); collector.ack(tuple);//通知spout处理成功 } catch (Exception e) { e.printStackTrace(); collector.fail(tuple);//通知spout 处理失败 } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void cleanup() { } @Override public Map
getComponentConfiguration() { return null; }}

WriterBolt

package bhz.bolt;import java.io.FileWriter;import java.io.IOException;import java.util.List;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class WriterBolt implements IRichBolt {    private static final long serialVersionUID = 1L;    private FileWriter writer;    private OutputCollector collector;    @Override    public void prepare(Map config, TopologyContext context, OutputCollector collector) {        this.collector = collector;        try {            writer = new FileWriter("d://message.txt");        } catch (IOException e) {            e.printStackTrace();        }    }    private boolean flag = false;        @Override    public void execute(Tuple tuple) {        String word = tuple.getString(0);//        List
list = (List
)tuple.getValueByField("word");// System.out.println("======================" + list); try { if(!flag && word.equals("hadoop")){ flag = true; int a = 1/0; } writer.write(word); writer.write("\r\n"); writer.flush(); } catch (Exception e) { e.printStackTrace(); collector.fail(tuple);//通知spout处理失败 } collector.emit(tuple, new Values(word)); collector.ack(tuple);//通知spout处理成功 } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map
getComponentConfiguration() { return null; }}

 

spout重发机制会带来一个问题:数据重复消费,看上面的例子当WriterBolt执行失败的时候,spout 将hadoop,hbase重发,那么hbase会被WriterBolt再执行一次,目前storm对此没有保障机制,按照业务设计的通用做法就是使用幂等性(比如使用唯一性ID),防止重复消费数据。

 

转载于:https://www.cnblogs.com/cac2020/p/9857697.html

你可能感兴趣的文章
一道不知道哪里来的容斥题
查看>>
Blender Python UV 学习
查看>>
window添加右键菜单
查看>>
入手腾龙SP AF90mm MACRO
查看>>
python学习4 常用内置模块
查看>>
Window7上搭建symfony开发环境(PEAR)
查看>>
ResolveUrl的用法
查看>>
Linux内核态、用户态简介与IntelCPU特权级别--Ring0-3
查看>>
第23月第24天 git命令 .git-credentials git rm --cached git stash clear
查看>>
java SE :标准输入/输出
查看>>
一些方便系统诊断的bash函数
查看>>
<转>关于MFC的多线程类 CSemaphore,CMutex,CCriticalSection,CEvent
查看>>
jquery中ajax返回值无法传递到上层函数
查看>>
css3之transform-origin
查看>>
[转]JavaScript快速检测浏览器对CSS3特性的支持
查看>>
Master选举原理
查看>>
[ JAVA编程 ] double类型计算精度丢失问题及解决方法
查看>>
小别离
查看>>
微信小程序-发起 HTTPS 请求
查看>>
WPF动画设置1(转)
查看>>