搭建Spark计算平台+python操作Spark

一、Spark安装及服务启动

Apache Spark是一种快速的集群计算技术,专为快速计算而设计。它基于Hadoop MapReduce,它扩展了MapReduce模型,以有效地将其用于更多类型的计算,包括交互式查询和流处理。 Spark的主要特性是它的内存中集群计算,提高了应用程序的处理速度(Spark 因为 RDD 是基于内存的,可以比较容易切成较小的块来处理。如果能对这些小块处理得足够快,就能达到低延时的效果)。


比起 Hadoop MapReduce, Spark 本质上就是基于内存的更快的批处理,然后用足够快的批处理来实现各种场景

1、安装Scala

下载并解压Scala

cd /opt/scala
wget https://downloads.lightbend.com/scala/2.11.7/scala-2.11.7.tgz
tar -zxf scala-2.11.7.tgz

将Scala添加到环境变量

vi /etc/profile

在最后面添加

export SCALA_HOME=/opt/scala/scala-2.11.7
export PATH=$PATH:$SCALA_HOME/bin

激活配置

source /etc/profile

2、Spark下载

官网下载和自己hadoop版本相匹配的spark安装包

3、解压安装文件并配置环境变量

(1)解压安装文件
解压安装文件到指定的的文件夹 /opt/spark

tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C  opt/spark

修改文件夹名字

cd /opt/spark/
mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0

(2)配置环境变量

export SPARK_HOME=/opt/spark/spark-2.2.0
export PATH=$SPARK_HOME/bin:$PATH

4、配置Spark

需要修改的配置文件有两个
spark-env.sh,spark-defaults.conf
(1)配置spark-env.sh

cd /opt/spark/spark-2.2.0/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh

配置如下内容

# 配置JAVA_HOME
export JAVA_HOME=/opt/java/jdk1.8.0_144
# 配置SCALA_HOME
export SCALA_HOME=/opt/scala/scala-2.11.7
# 配置HADOOP
export HADOOP_HOME=/opt/hadoop/hadoop-2.7.6/
export HADOOP_CONF_DIR=/opt/hadoop/hadoop-2.7.6/etc/hadoop

#定义管理端口
export SPARK_MASTER_WEBUI_PORT=8088
#定义master域名和端口
export SPARK_MASTER_HOST=spark-master
export SPARK_MASTER_PORT=7077  # 提交Application的端口
export SPARK_MASTER_IP=10.141.211.80

# 定义work节点的管理端口
export SPARK_WORKER_WEBUI_PORT=8088
# 每一个Worker最多可以使用的cpu core的个数,真实服务器如果有32个,可以设置为32个
export SPARK_WORKER_CORES=10
# 每一个Worker最多可以使用的内存,真实服务器如果有128G,你可以设置为100G
export SPARK_WORKER_MEMORY=4g

(2)配置spark-defaults.conf

cd /opt/spark/spark-2.2.0/conf
vim spark-defaults.conf

配置如下内容

spark.eventLog.enabled=true
spark.eventLog.compress=true

# 保存在本地
# spark.eventLog.dir=file://opt/hadoop/hadoop-2.7.6/logs/userlogs
# spark.history.fs.logDirectory=file://opt/hadoop/hadoop-2.7.6/logs/userlogs

# 保存在hdfs上
spark.eventLog.dir=hdfs://spark-master:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://spark-master:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=spark-master:18080

5、启动Spark

sbin/start-all.sh 

二、PySpark安装

pyspark是用来对接 spark的 Python 库

pip install pyspark

三、使用pyspark

1、SparkContext声明

SparkContext是任何spark功能的入口点。

from pyspark import SparkContext
sc = SparkContext("local", "First App")

2、一些基本操作

(1)count
返回RDD中的元素个数

from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
counts = words.count()
print("Number of elements in RDD -> %i" % counts)

(2)collect
返回RDD中的所有元素

from pyspark import SparkContext
sc = SparkContext("local", "collect app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
coll = words.collect()
print("Elements in RDD -> %s" % coll)

(3)foreach
仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素。

from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)

(4)filter
返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含''spark'的字符串。

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

(5)map
通过将该函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

(6)reduce
执行指定的可交换和关联二元操作后,将返回RDD中的元素。在下面的示例中,我们从运算符导入add包并将其应用于'num'以执行简单的加法运算。说白了和Python的reduce一样:假如有一组整数[x1,x2,x3],利用reduce执行加法操作add,对第一个元素执行add后,结果为sum=x1,然后再将sum和x2执行add,sum=x1+x2,最后再将x2和sum执行add,此时sum=x1+x2+x3。

from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

(7)join
它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。

from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))

官方文档

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 158,560评论 4 361
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 67,104评论 1 291
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 108,297评论 0 243
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 43,869评论 0 204
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 52,275评论 3 287
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 40,563评论 1 216
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 31,833评论 2 312
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 30,543评论 0 197
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 34,245评论 1 241
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 30,512评论 2 244
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 32,011评论 1 258
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 28,359评论 2 253
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 33,006评论 3 235
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 26,062评论 0 8
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 26,825评论 0 194
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 35,590评论 2 273
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 35,501评论 2 268

推荐阅读更多精彩内容