应用Spark技术,SoData数据机器人实现快速、通用数据治理

应用Spark技术,SoData数据机器人实现快速、通用数据治理,第1张

Spark是处理海量数据的快速通用引擎。作为大数据处理技术,Spark经常会被人们拿来与Hadoop比较。

Hadoop已经成了大数据技术的事实标准,Hadoop MapReduce也非常适合于对大规模数据集合进行批处理操作,但是其本身还存在一些缺陷。具体表现在:

1、Hadoop MapRedue的表达能力有限。所有计算都需要转换成Map和 Reduce两个操作,不能适用于所有场景,对于复杂的数据处理过程难以描述。

2、磁盘I/O开销大。Hadoop MapReduce要求每个步骤间的数据序列化到磁盘,所以I/O成本很高,导致交互分析和迭代算法开销很大,而几乎所有的最优化和机器学习都是迭代的。所以,Hadoop MapReduce不适合于交互分析和机器学习。

3、计算延迟高。如果想要完成比较复杂的工作,就必须将一系列的MapReduce作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始启动。因此,Hadoop MapReduce不能胜任比较复杂的、多阶段的计算服务。

Spark借鉴Hadoop MapReduce技术发展而来,继承了其分布式并行计算的优点的同时,改进了MapReduce的许多缺陷。具体优势如下:

1、Spark提供广泛的数据集操作类型(20+种),支持Java,Python和Scala API,支持交互式的Python和Scala的shell。比Hadoop更加通用。

2、Spark提供Cache机制来支持需要反复迭代的计算或者多次数据共享,减少数据读取的I/O开销。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速,缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。

3、Spark提供了内存计算,把中间结果放到内存中,带来了更高的迭代运算效率。通过支持有向无环图(DAG)的分布式并行计算的编程框架,减少迭代过程中数据需要写入磁盘的需求,提高处理效率。

此外,Spark还能与Hadoop无缝衔接,Spark可以使用YARN作为它的集群管理器,可以读取HDFS、HBase等一切Hadoop的数据。

Spark在最近几年发展迅速,相较于其他大数据平台或框架,Spark的代码库最为活跃。截止目前,最新发布的版本为Spark330。

也有许多数据治理工具,为了实现实时、通用的数据治理而采用Spark技术。以飞算推出的SoData数据机器人为例,是一套实时+批次、批流一体、高效的数据开发治理工具,能够帮助企业快速实现数据应用。

相较于传统数据加工流程,SoData数据机器人实现了流批一体数据同步机制,基于Spark和Flink框架进行深度二次开发,实现数据采集、集成、转换、装载、加工、落盘全流程实时+批次处理的极致体验,秒级延迟,稳定高效平均延迟5-10s,快速响应企业数据应用需求。

除了具备Spark数据处理的优势,SoData数据机器人的Spark体系还支持从各种数据源执行SQL生成Spark字典表,边开发边调试的Spark-SQL开发,支持任意结果集输出到各类数据库。可视化的运维、开发方式也能在极大降低数据开发、治理、应用门槛的同时,提升效率。

在某综合医院的信息化建设中,SoData数据机器人曾在5分钟内完成原本需要8-9小时才能完成的数据迁移工作。

目前,SoData数据机器人已应用于金融、医疗、能源等多个行业,将持续通过创新技术,为各行业组织机构带来更优质、快速的数据开发、治理、应用体验。

个性化网页推荐引擎是一种基于用户兴趣和行为习惯等因素,通过分析用户数据并实时推荐内容的技术。在国内,有一些互联网公司和科技企业正在开展个性化推荐引擎的研究和应用,例如:

百度:百度是国内最大的搜索引擎之一,其推荐算法已经广泛应用于搜索、新闻、视频等多个领域。

腾讯:腾讯是国内领先的互联网科技企业,旗下的微信、QQ等产品拥有庞大的用户群体,其个性化推荐引擎在社交、娱乐、新闻等方面得到了广泛应用。

今日头条:今日头条是国内领先的内容分发平台之一,其个性化推荐算法已经实现了对用户兴趣、行为等多个方面的精准分析和推荐。

淘宝:淘宝是国内最大的电商平台之一,其个性化推荐引擎已经广泛应用于商品推荐、搜索等多个领域。

此外,还有一些专注于个性化推荐技术的创业公司和科技团队正在不断涌现,他们通过开发和应用新的算法和技术,不断推进个性化推荐引擎的发展。

动手实验Apache Spark的最好方式是使用交互式Shell命令行,Spark目前有Python Shell和Scala Shell两种交互式命令行。

可以从 这里下载Apache Spark,下载时选择最近预编译好的版本以便能够立即运行shell。

目前最新的Apache Spark版本是150,发布时间是2015年9月9日。

tar -xvzf ~/spark-150-bin-hadoop24tgz

运行Python Shell

cd spark-150-bin-hadoop24

/bin/pyspark

在本节中不会使用Python Shell进行演示。

Scala交互式命令行由于运行在JVM上,能够使用java库。

运行Scala Shell

cd spark-150-bin-hadoop24

/bin/spark-shell

执行完上述命令行,你可以看到下列输出:

Scala Shell欢迎信息

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ __/\_,_/_/ /_/\_\ version 150

/_/

Using Scala version 2104 (Java HotSpot(TM) 64-Bit Server VM, Java 180_25)

Type in expressions to have them evaluated

Type :help for more information

15/08/24 21:58:29 INFO SparkContext: Running Spark version 150

下面是一些简单的练习以便帮助使用shell。也许你现在不能理解我们做的是什么,但在后面我们会对此进行详细分析。在Scala Shell中,执行下列操作:

在Spark中使用README 文件创建textFileRDD

val textFile = sctextFile("READMEmd")

获取textFile RDD的第一个元素

textFilefirst()

res3: String = # Apache Spark

对textFile RDD中的数据进行过滤操作,返回所有包含“Spark”关键字的行,操作完成后会返回一个新的RDD,操作完成后可以对返回的RDD的行进行计数

筛选出包括Spark关键字的RDD然后进行行计数

val linesWithSpark = textFilefilter(line => linecontains("Spark"))

linesWithSparkcount()

res10: Long = 19

要找出RDD linesWithSpark单词出现最多的行,可以使用下列操作。使用map方法,将RDD中的各行映射成一个数,然后再使用reduce方法找出包含单词数最多的行。

找出RDD textFile 中包含单词数最多的行

textFilemap(line => linesplit(" ")size)

reduce((a, b) => if (a > b) a else b)

res11: Int = 14

返回结果表明第14行单词数最多。

也可以引入其它java包,例如 Mathmax()方法,因为map和reduce方法接受scala函数字面量作为参数。

在scala shell中引入Java方法

import javalangMath

textFilemap(line => linesplit(" ")size)

reduce((a, b) => Mathmax(a, b))

res12: Int = 14

我们可以很容易地将数据缓存到内存当中。

将RDD linesWithSpark 缓存,然后进行行计数

linesWithSparkcache()

res13: linesWithSparktype =

MapPartitionsRDD[8] at filter at <console>:23

linesWithSparkcount()

res15: Long = 19

上面简要地给大家演示的了如何使用Spark交互式命令行。

弹性分布式数据集(RDDs)

Spark在集群中可以并行地执行任务,并行度由Spark中的主要组件之一——RDD决定。弹性分布式数据集(Resilient distributed data, RDD)是一种数据表示方式,RDD中的数据被分区存储在集群中(碎片化的数据存储方式),正是由于数据的分区存储使得任务可以并行执行。分区数量越多,并行越高。下图给出了RDD的表示:

Display- Edit

想像每列均为一个分区(partition ),你可以非常方便地将分区数据分配给集群中的各个节点。

为创建RDD,可以从外部存储中读取数据,例如从Cassandra、Amazon简单存储服务(Amazon Simple Storage Service)、HDFS或其它Hadoop支持的输入数据格式中读取。也可以通过读取文件、数组或JSON格式的数据来创建RDD。另一方面,如果对于应用来说,数据是本地化的,此时你仅需要使用parallelize方法便可以将Spark的特性作用于相应数据,并通过Apache Spark集群对数据进行并行化分析。为验证这一点,我们使用Scala Spark Shell进行演示:

本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为231。

首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。

可以看到这里加载的类是orgapachesparkdeploySparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的

SparkSubmit的main方法如下

这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。

以下是doPrepareSubmitEnvironment方法的源码

可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。

childMainClass根据部署模型有不同的值:

之后该方法会把准备好的四元组返回,我们接着看之前的submit方法

可以看到这里最终会调用doRunMain()方法去进行下一步。

doRunMain的实现如下

doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。

这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。

ClientApp的start方法如下

可以看到这里和之前我们的master启动流程有些相似。

可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。

首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。

而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。

ClientEndPoint类中的onStart方法会匹配launch事件。源码如下

onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。

这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。

而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。

下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。

之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。

可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schedule函数。

由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。

Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的orgapachesparkdeployworkerDriverWrapper

可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver

runDriver中主要先做了一些初始化工作,接着就开始启动driver了。

上述Driver启动工作主要分为以下几步:

下面我们直接看DriverWrapper的实现

DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。

以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。

欢迎关注

从速度的角度看,Spark从流行的MapReduce模型继承而来,可以更有效地支持多种类型的计算,如交互式查询和流处理。速度在大数据集的处理中非常重要,它可以决定用户可以交互式地处理数据,还是等几分钟甚至几小时。Spark为速度提供的一个重要特性是其可以在内存中运行计算,即使对基于磁盘的复杂应用,Spark依然比MapReduce更有效。

从通用性来说,Spark可以处理之前需要多个独立的分布式系统来处理的任务,这些任务包括批处理应用、交互式算法、交互式查询和数据流。通过用同一个引擎支持这些任务,Spark使得合并不同的处理类型变得简单,而合并操作在生产数据分析中频繁使用。而且,Spark降低了维护不同工具的管理负担。

Spark被设计的高度易访问,用Python、Java、Scala和SQL提供简单的API,而且提供丰富的内建库。Spark也与其他大数据工具进行了集成。特别地,Spark可以运行在Hadoop的集群上,可以访问任何Hadoop的数据源,包括Cassandra。

Spark 核心组件

Spark核心组件包含Spark的基本功能,有任务调度组件、内存管理组件、容错恢复组件、与存储系统交互的组件等。Spark核心组件提供了定义弹性分布式数据集(resilient distributed datasets,RDDs)的API,这组API是Spark主要的编程抽象。RDDs表示分布在多个不同机器节点上,可以被并行处理的数据集合。Spark核心组件提供许多API来创建和操作这些集合。

Spark SQLSpark SQL是Spark用来处理结构化数据的包。它使得可以像Hive查询语言(Hive Query Language, HQL)一样通过SQL语句来查询数据,支持多种数据源,包括Hive表、Parquet和JSON。除了为Spark提供一个SQL接口外,Spark SQL允许开发人员将SQL查询和由RDDs通过Python、Java和Scala支持的数据编程操作混合进一个单一的应用中,进而将SQL与复杂的分析结合。与计算密集型环境紧密集成使得Spark SQL不同于任何其他开源的数据仓库工具。Spark SQL在Spark 10版本中引入Spark。

Shark是一个较老的由加利福尼亚大学和伯克利大学开发的Spark上的SQL项目,通过修改Hive而运行在Spark上。现在已经被Spark SQL取代,以提供与Spark引擎和API更好的集成。

Spark流(Spark Streaming)Spark流作为Spark的一个组件,可以处理实时流数据。流数据的例子有生产环境的Web服务器生成的日志文件,用户向一个Web服务请求包含状态更新的消息。Spark流提供一个和Spark核心RDD API非常匹配的操作数据流的API,使得编程人员可以更容易地了解项目,并且可以在操作内存数据、磁盘数据、实时数据的应用之间快速切换。Spark流被设计为和Spark核心组件提供相同级别的容错性,吞吐量和可伸缩性。

MLlibSpark包含一个叫做MLlib的关于机器学习的库。MLlib提供多种类型的机器学习算法,包括分类、回归、聚类和协同过滤,并支持模型评估和数据导入功能。MLlib也提供一个低层的机器学习原语,包括一个通用的梯度下降优化算法。所有这些方法都可以应用到一个集群上。

GraphXGraphX是一个操作图(如社交网络的好友图)和执行基于图的并行计算的库。与Spark流和Spark SQL类似,GraphX扩展了Spark RDD API,允许我们用和每个节点和边绑定的任意属性来创建一个有向图。GraphX也提供了各种各样的操作图的操作符,以及关于通用图算法的一个库。

集群管理器Cluster Managers在底层,Spark可以有效地从一个计算节点扩展到成百上千个节点。为了在最大化灵活性的同时达到这个目标,Spark可以运行在多个集群管理器上,包括Hadoop YARN,Apache Mesos和一个包含在Spark中的叫做独立调度器的简易的集群管理器。如果你在一个空的机器群上安装Spark,独立调度器提供一个简单的方式;如果你已经有一个Hadoop YARN或Mesos集群,Spark支持你的应用允许在这些集群管理器上。第七章给出了不同的选择,以及如何选择正确的集群管理器。

谁使用Spark用Spark做什么

由于Spark是一个面向集群计算的通用框架,可用于许多不同的应用。使用者主要有两种:数据科学家和数据工程师。我们仔细地分析一下这两种人和他们使用Spark的方式。明显地,典型的使用案例是不同的,但我们可以将他们粗略地分为两类,数据科学和数据应用。

数据科学的任务数据科学,近几年出现的一门学科,专注于分析数据。尽管没有一个标准的定义,我们认为一个数据科学家的主要工作是分析和建模数据。数据科学家可能会SQL,统计学,预测模型(机器学习),用Python、MATLAB或R编程。数据科学家能将数据格式化,用于进一步的分析。

数据科学家为了回答一个问题或进行深入研究,会使用相关的技术分析数据。通常,他们的工作包含特殊的分析,所以他们使用交互式shell,以使得他们能在最短的时间内看到查询结果和代码片段。Spark的速度和简单的API接口很好地符合这个目标,它的内建库意味着很多算法可以随时使用。

Spark通过若干组件支持不同的数据科学任务。Spark shell使得用Python或Scala进行交互式数据分析变得简单。Spark SQL也有一个独立的SQL shell,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-,需要的小伙伴可以点击它可以用SQL进行数据分析,也可以在Spark程序中或Spark shell中使用Spark SQL。MLlib库支持机器学习和数据分析。而且,支持调用外部的MATLAB或R语言编写的程序。Spark使得数据科学家可以用R或Pandas等工具处理包含大量数据的问题。

第一阶段:熟练的掌握Scala语言

1,Spark框架是采用Scala语言编写的,精致而优雅。要想成为Spark高手,你就必须阅读Spark的源代码,就必须掌握Scala,;

2, 虽然说现在的Spark可以采用多语言Java、Python等进行应用程序开发,但是最快速的和支持最好的开发API依然并将永远是Scala方式的API,所以你必须掌握Scala来编写复杂的和高性能的Spark分布式程序;

3, 尤其要熟练掌握Scala的trait、apply、函数式编程、泛型、逆变与协变等;

第二阶段:精通Spark平台本身提供给开发者API

1, 掌握Spark中面向RDD的开发模式,掌握各种transformation和action函数的使用;

2, 掌握Spark中的宽依赖和窄依赖以及lineage机制;

3, 掌握RDD的计算流程,例如Stage的划分、Spark应用程序提交给集群的基本过程和Worker节点基础的工作原理等

第三阶段:深入Spark内核

此阶段主要是通过Spark框架的源码研读来深入Spark内核部分:

1, 通过源码掌握Spark的任务提交过程;

2, 通过源码掌握Spark集群的任务调度;

3, 尤其要精通DAGScheduler、TaskScheduler和Worker节点内部的工作的每一步的细节;

第四阶级:掌握基于Spark上的核心框架的使用

Spark作为云计算大数据时代的集大成者,在实时流处理、图技术、机器学习、NoSQL查询等方面具有显著的优势,我们使用Spark的时候大部分时间都是在使用其上的框架例如Shark、Spark Streaming等:

1, Spark Streaming是非常出色的实时流处理框架,要掌握其DStream、transformation和checkpoint等;

2, Spark的离线统计分析功能,Spark 100版本在Shark的基础上推出了Spark SQL,离线统计分析的功能的效率有显著的提升,需要重点掌握;

3, 对于Spark的机器学习和GraphX等要掌握其原理和用法;

第五阶级:做商业级别的Spark项目

通过一个完整的具有代表性的Spark项目来贯穿Spark的方方面面,包括项目的架构设计、用到的技术的剖析、开发实现、运维等,完整掌握其中的每一个阶段和细节,这样就可以让您以后可以从容面对绝大多数Spark项目。

第六阶级:提供Spark解决方案

1, 彻底掌握Spark框架源码的每一个细节;

2, 根据不同的业务场景的需要提供Spark在不同场景的下的解决方案;

3, 根据实际需要,在Spark框架基础上进行二次开发,打造自己的Spark框架;

欢迎分享,转载请注明来源:浪漫分享网

原文地址:https://hunlipic.com/qinggan/7717186.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-09-07
下一篇2023-09-07

发表评论

登录后才能评论

评论列表(0条)

    保存