黑马程序员Python 学习

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为如下三大步骤:

  1. 数据输入,通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象。
  2. 数据处理计算,通过RDD类对象的成员方法完成各种数据计算的需求
  3. 数据输出,将处理完成后的RDD对象调用各种成员方法完成、写出文件、转换为list等操作
1
2
3
4
5
6
7
8
9
10
11
12
#导包
from pyspark import SparkConf, SparkContext

#创建Sparkconf类对象
cof = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

#基于SparkConf对象创建Sparltext对象
sc = SparkContext(conf=cof)

print(sc.version)
#停止SparkContxt对象运行
sc.stop()

读取文件并输出内容

1
2
rdd = sc.textFile("D:/hello.txt")
print(rdd.collect())

对数据进行查找处理的简单案例

通过简单学习可以看出在处理数据上python确实比其他方便的多啊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#导包
from pyspark import SparkConf, SparkContext
#设置环境
import os
import json
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.6/python.exe"
#创建Sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")

#基于SparkConf对象创建Sparltext对象
sc = SparkContext(conf=conf)
rdd = sc.textFile("F:/cs资料库/Python/第15章资料/资料/orders.txt")
#提取json字符串
json_rdd = rdd.flatMap(lambda x:x.split("|"))

#将每一个json转为字典
dict_rdd = json_rdd.map(lambda x:json.loads(x))
#取出城市和销售额
city_with_money = dict_rdd.map(lambda x:(x['areaName'], int(x['money'])))
city_res_rdd = city_with_money.reduceByKey(lambda a,b:a+b)
res1 = city_res_rdd.sortBy(lambda x:x[1],ascending=False, numPartitions=1)#按照销售额大小排序
print("需求1结果:",res1.collect())

#需求2:全部城市商品售卖目录
categroy_rdd = dict_rdd.map(lambda x:x['category']).distinct()
print("需求2结果:",categroy_rdd.collect())

#需求3:查看北京有什么商品
Bj_data_rdd= dict_rdd.filter(lambda x:x['areaName']=='北京')
res3 = Bj_data_rdd.map(lambda x:x['category']).distinct()
print("需求3结果:",res3.collect())
#print(dict_rdd.collect())

#停止SparkContxt对象运行
sc.stop()

hadoop

使用spark输出依靠Hadoop,注意如果已经存在文件会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#导包
from pyspark import SparkConf, SparkContext
#设置环境
import os
os.environ['HADOOP_HOME'] = "F:/Python_hadoop/hadoop-3.0.0"
import json
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.6/python.exe"
#创建Sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism","1") #避免分区过多导致输出过多文件
#基于SparkConf对象创建Sparltext对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,6])

rdd.saveAsTextFile("D:/output1")

#停止SparkContxt对象运行
sc.stop()

综合案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#导包
from pyspark import SparkConf, SparkContext
#设置环境
import os
os.environ['HADOOP_HOME'] = "F:/Python_hadoop/hadoop-3.0.0"
import json
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.6/python.exe"

#创建Sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism","1")
#基于SparkConf对象创建Sparltext对象
sc = SparkContext(conf=conf)
#读取文件转换成RDD
file_rdd = sc.textFile("F:/cs资料库/Python/第15章资料/资料/search_log.txt")

#需求1:热门搜索时间段top3
res1 = file_rdd.map(lambda x:x.split("\t")).\
map(lambda x:x[0][:2]).\
map(lambda x:(x,1)).\
reduceByKey(lambda a,b:a+b).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(3)
#上面等价于下面这种写法
# file_rdd.map(lambda x:(x.split("\t")[0][:2],1)).\
# reduceByKey(lambda a,b:a+b).\
# sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(3)

print("需求1:",res1)

#需求2:热门搜索词Top3
res2 = file_rdd.map(lambda x: (x.split("\t")[2],1)).\
reduceByKey(lambda a,b:a+b).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(3)
print("需求2:",res2)

#需求三 统计黑马程序员关键字在什么时段搜索最多
res3 = file_rdd.map(lambda x: x.split("\t")).\
filter(lambda x: x[2] == '黑马程序员').\
map(lambda x: x[0][:2],1).\
reduceByKey(lambda a, b: a+b).\
sortBy(lambda x: x[1], ascending=False,numPartitions=1).take(1)
print("需求3:", res3)

#需求4 将数据转换为JSON写到文件中
file_rdd.map(lambda x:x.split("\t")).\
map(lambda x:{"time":x[0],"user_id":x[1],"key_word":x[2],"rank1":x[3],"rank2":x[4],"url":x[5]}).\
saveAsTextFile("D:/output_json")

sc.stop()

闭包

在函数嵌套的前提下,内部函数使用了外部函数的变量,并且外部函数返回了内部函数,我们把这个使用外部函数变量的内部函数称为闭包。可以避免全局变量被修改的风险

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#使用闭包实现存钱取钱小案例
def account_creat(initial_amount = 0):
def atm(num,deposit=True):
nonlocal initial_amount
if deposit:
initial_amount+=num
print(f"存款:{num},账户余额:{initial_amount}")
else:
initial_amount-=num
print(f"存款:{num},账户余额:{initial_amount}")
return atm
atm = account_creat()
atm(100)
atm(200)
atm(100,deposit=False)

优点,使用闭包可以让我们得到:
无需定义全局变量即可实现通过函数,持续的访问、修改某个值
闭包使用的变量的所用于在函数内,难以被错误的调用修改

缺点:
由于内部函数持续引用外部函数的值,所以会导致这一部分内存空间不被释放,一直占用内存

装饰器

装饰器其实也是一种闭包, 其功能就是在不破坏目标函数原有的代码和功能的前提下,为目标函数增加新功能。
实例写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

def outer(func):
def inner():
print("我睡觉了")
func()
print("我起床了")

return inner

@outer
def sleep():
import random
import time
print("zzzzz.....")
time.sleep(random.randint(1,5))


sleep()

设计模式

设计模式是一种编程套路,可以极大的方便程序的开发。最常见、最经典的设计模式,就是我们所学习的面向对象了。除了面向对象外,在编程中也有很多既定的套路可以方便开发,我们称之为设计模式:单例、工厂模式建造者、责任链、状态、备忘录、解释器、访问者、观察者、中介、模板、代理模式等等模式。

单例模式(Singleton Pattern)是一种常用的软件设计模式,该模式的主要目的是确保某一个类只有一个实例存在。
在整个系统中,某个类只能出现一个实例时,单例对象就能派上用场。
定义: 保证一个类只有一个实例,并提供一个访问它的全局访问点
适用场景:当一个类只能有一个实例,而客户可以从一个众所周知的访问点访问它时。

当需要大量创建一个类的实例的时候, 可以使用工厂模式。即,从原生的使用类的构造去创建对象的形式迁移到,基于工厂提供的方法去创建对象的形式。

•使用工厂类的get_person()方法去创建具体的类对象

优点:

•大批量创建对象的时候有统一的入口,易于代码维护

•当发生修改,仅修改工厂类的创建方法即可

•符合现实世界的模式,即由工厂来制作产品(对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

class person:
pass
class worker(person):
pass
class student(person):
pass
class teacher(person):
pass
class personfactory:
def get_person(self,p_type):
if p_type == 'w':
return worker()
elif p_type == 's':
return student()
else:
return teacher()

pf = personfactory()
worker = pf.get_person('w')

进程和线程

现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操作系统。
进程: 就是一个程序,运行在系统之上,那么便称之这个程序为一个运行进程,并分配进程ID方便系统管理。
线程:线程是归属于进程的,一个进程可以开启多个线程,执行不同的工作,是进程的实际工作最小单位。

注意:进程之间是内存隔离的, 即不同的进程拥有各自的内存空间。 线程之间是内存共享的,线程是属于进程的,一个进程内的多个线程之间是共享这个进程所拥有的内存空间的。

多线程编程案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import threading
def sing():
while True:
print("sing>>")
time.sleep(1)


def dance():
while True:
print("dance<<")
time.sleep(1)

if __name__=="__main__":
sing_thread=threading.Thread(target=sing)
dance_thread = threading.Thread(target=dance)

#让线程运行
sing_thread.start()
dance_thread.start()

网络编程

服务端开发

socket (简称 套接字) 是进程之间通信一个工具,好比现实生活中的插座,所有的家用电器要想工作都是基于插座进行,进程之间想要进行网络通信需要socket。
Socket负责进程之间的网络数据传输,好比数据的搬运工。

2个进程之间通过Socket进行相互通讯,就必须有服务端和客户端Socket服务端:等待其它进程的连接、可接受发来的消息、可以回复消息;Socket客户端:主动连接服务端、可以发送消息、可以接收回复
客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#创建Socket对象
import socket

socket_server = socket.socket()
#绑定Ip地址和端口
socket_server.bind(("localhost", 8888))
#监听端口
socket_server.listen(1) #表示接收数量

#等待客户端连接
#result = socket_server.accept()
# conn = result[0] #客户端和服务端的连接对象
# address = result[1] #客户端的地址信息
conn,address = socket_server.accept() #等于上面的写法
#accept()是阻塞的方法-等待客户段连接如果没有连接就卡在这里不执行

print(f"接收到了客户端的连接,客户端消息是:{address}")

while True:
# 接收客户端信息,要使用客户端和服务端本次连接对象,而非socket——sever对象
data : str= conn.recv(1024).decode("UTF-8")

print(f"客户端发来的消息是:{data}")

#发送回复消息
msg = input("请输入回复消息")
if msg == 'quit':
break
conn.send(msg)

#关闭连接
conn.close()
socket_server.close()

服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#创建Socket对象
import socket

socket_client = socket.socket()

#连接到服务器
socket_client.connect(("localhost",8888))
while True:
#发送消息
msg=input("请输入发送给服务端的消息:")
if msg=="quit":
break
socket_client.send(msg.encode("UTF-8"))

#接收返回消息
recv_data = socket_client.recv(1024)#1024是缓冲区的大小
print(f"服务端回复的消息是:{recv_data.decode('UTF-8')}")
socket_client.close()

正则表达式

正则表达式,又称规则表达式(Regular Expression),是使用单个字符串来描述、匹配某个句法规则的字符串,常被用来检索、替换那些符合某个模式(规则)的文本。简单来说,正则表达式就是使用:字符串定义规则,并通过规则去验证字符串是否匹配。
Python正则表达式,使用re模块,并基于re模块中三个基础方法来做正则匹配。分别是:match、search、findall 三个基础方法
元字符匹配
点击查看大佬说明

1
2
3
4
5
import re

s = 'python itheima@@sl;djko1112328sjp@2'
result = re.findall('\d',s)
print(result)

相对复杂的案例
1
2
3
4
5
import re
#匹配只有字母和数字组成长度6-10位
s = 'python itheima@@sl;djko1112328sjp@2'
r = '^[0-9a-zA-Z]{6,10}'
print(re.findall(r,s))

递归

这个就不写了,具体应该能在Acwing-基础篇里有,毕竟是算法里比较重要的了还是多练为好。

总结

目前是23年1月1日元旦节的下午,艰苦的一年终于要开始了,希望我能坚持下去这一年实现我考上一个好研究生的梦想。在未来这一年里可能不会学除了考研外的东西了,黑马的Python应该还没有鹿丸,等以后有机会再来写吧。