Spark事务处理

Posted by luoxuehuan on July 14, 2016

一、什么是事务:

1.1 情形描述:

task处理到一半,失败了。可是数据 已经在输出存储在数据库了。如果第二次继续重试task,数据是不是就重复输出了?

Spark Streaming 的任务失败,讲会自动进行重试, 数据会被多次写入到存储程序中。

1.2 事务的要求:

1.能够处理且处理一次。 2.能够输出且只输出一次。

Exactly one

举例:A 给B转账,A一定会被扣除,且被扣除一次。B能接受到A转账的数据,且只接受到一次。

问:会不会完全失败呢? 答:可能性不大,一般会被处理。 除非硬件 全 部崩溃。

数据不断流进Executor。Receiver。
Executor 中只有 具体该怎么算 这个事情。

WAL: Write-Ahead Logging[1] 预写日志系统 数据库中一种高效的日志算法,对于非内存数据库而言,磁盘I/O操作是数据库效率的一大瓶颈。在相同的数据量下,采用WAL日志的数据库系统在事务提交时,磁盘写操作只有传统的回滚日志的一半左右,大大提高了数据库磁盘I/O操作的效率,从而提高了数据库的性能。

二、怎么实现事务

一、Exactly Once 的事务处理:

1.数据零丢失:必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全。 2.Spark Streaming 1.3 的时候 为了避免WAL的性能 损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统!!!(兼具了流的优势和文件系统的优势。Kafka消耗磁盘和HDFS消耗磁盘是一样的。直接操作它的offset) 至此,Spark Streaming + Kafka 就构建了完美的流处理世界。 为什么完美: 1,数据不用拷贝副本。2.不用WAL 而性能损耗。3.Kafka比HDFS搞笑很多。 又是怎么解决重复消费问题呢? 直接管理offset,所以也不好重复消费数据。自己处理了哪些东西,自己知道。 所有的Executor 通过Kafka API直接消费数据。 事务 实现啦!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

Receiver在接受数据 的时候崩溃,并没有完成副本的复制。Receiver恢复之后要重新接受【前提:Kafka】 接受数据 会在zookeeper里面管理。 确认接受到信息,才会读取下面的信息。

二、数据丢失的情况机器具体的解决方式:

1,在Receiver收到数据且通过Driver的调度。Executor开始计算数据的 时候

如果Driver 突然崩溃。则此时,Executor会被Kill掉,那么Executor中的数据就会丢失。(不做WAL的情况下)

此时就必须通过,例如WAL的方式(先把数据保存,再谈其他事情),让所有的数据都通过例如HDFS的方式首先进行安全性容错处理。 此时如果Executor 中的数据 丢失的话,就可以通过WAL恢复回来。

三、数据重复读取(重复消费)的情况

Kafka是毫无疑问的第一选择。必须精通。做Spark ,Kafka的重要性绝对比HDFS重要。 在Receiver 收到数据且保存到了HDFS等持久化引擎 但是没有来得及进行updateOffsets,此时Receiver崩溃后重新启动就会通过管理Kafka的Zookeeper中元数据再次重复读取数据,但是此时Spark Streaming 认为是成功的。 但是,Kafka认为是失败的(因为没有更新offset到zookeeper中),此时就会导致数据重复消费的情况。 怎么解决:处理数据的时候可以访问到元数据信息,那么可以把元数据信息 写入到内存数据库。(MemReset) 查询一下 元数据信息是否被处理过,处理过就跳过。 每次处理都查。 一个很简单的内存数据结构。SQLite。

四、性能损失

1.通过WAL的方式弊端 :会极大的损伤Spark Streaming中Receivers 接受数据的性能。实际生产环境下,用Receivers 的情况并不多,而是用Kafka的Direct API。 2.如果通过Kafka作为数据来源的话,Kafka中有数据,然后Receiver接受的时候又会有数据副本,这个时候其实是存储资源的浪费。

五、关于Spark Streaming 数据输出多次重写及其解决方案:

1.为什么会有这个问题:

因为Spark Streaming 在计算的时候基于Spark Core ,而Spark Core 天生会做以下事情导致Spark Streaming 的结果(部分)重复输出; 1.1Task重试。 1.2Stage重试。 1.3Job重试。

2,具体的解决方案:

2.1 设置Spark.task.maxFailures (最大失败次数)次数为一。 2.2 设置spark.speculation为关闭状态(因为慢任务推测非常消耗性能,所以关闭后可以显著的提高Spark的处理性能),就不会出现两个相同的任务在运行了。

Spark Streaming on Kafka的话,Job失败后,可以设置auto.offset.reset为largest的方式。这样就自动的进行恢复。

最后再次强调,可以通过transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复! 这两个方式类似于Spark Streaming的后门,可以做任意想象的控制操作!