上学期有个大创可以报名,同时看到了我感兴趣的大创项目(评论数据分析)。
直接报名火速联系导师,导师说可以报名,于是便开始了大创之旅。
经过两周半的肝项目之后,终于将我从bilibili的15000条评论分析可视化展示了出来。

效果图

现在直接把我的代码贴出来,希望可以给各位大创项目提供参考。
通过coze构建ai工作流,可以低代码模块化实现ai接口
然后通过接入ai的工作流,可以返回与主题有关的bv号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""
This example describes how to use the workflow interface to chat.
"""

import os
# Our official coze sdk for Python [cozepy](https://github.com/coze-dev/coze-py)
from cozepy import COZE_CN_BASE_URL
from pycparser.plyparser import parameterized

# Get an access_token through personal access token or oauth.
coze_api_token = 'pat_zcqBp7ZcnLdmTdL2YYYaXQFERmCnasLb7l312gibI54kj0q6hMGYoAx4aZuTi4A5'
# The default access is api.coze.com, but if you need to access api.coze.cn,
# please use base_url to configure the api endpoint to access
coze_api_base = COZE_CN_BASE_URL

from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageContentType # noqa
def wordfloww(input_data):
# Init the Coze client through the access_token.
coze = Coze(auth=TokenAuth(token=coze_api_token), base_url=coze_api_base)
parameters= {
"input": input_data,
}
# Create a workflow instance in Coze, copy the last number from the web link as the workflow's ID.
workflow_id = '7508700351517900837'

# Call the coze.workflows.runs.create method to create a workflow run. The create method
# is a non-streaming chat and will return a WorkflowRunResult class.
workflow = coze.workflows.runs.create(
workflow_id=workflow_id,
parameters=parameters
)
return workflow.data

将 bv号写入 csv文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*- coding:utf-8-*-
import requests
import url_workflow
import json
import csv
from json.decoder import JSONDecodeError

BV2AV_API = 'https://api.bilibili.com/x/web-interface/view'
HEADER = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/80.0.3987.149 Safari/537.36'}

def bv_to_av(bv):
r = requests.get(BV2AV_API, {'bvid': bv}, headers=HEADER)
response = decode_json(r)
try:
return str(response['data']['aid'])
except (KeyError, TypeError):
return '获取av号失败'
def decode_json(r):
try:
response = r.json()
except JSONDecodeError:
# 虽然用的是requests的json方法,但要捕获的这个异常来自json模块
return -1
else:
return response



if __name__ == '__main__':
a = url_workflow.wordfloww(input('请输入主题'))

bv_list = []

try:
data = json.loads(a) # 解析 JSON 字符串
output_value = data["output"] # 获取 output 字段
bv_list = output_value.split("\n") # 根据 \\n 分割成列表
except json.JSONDecodeError:
print("输入不是有效的 JSON")

csv_file = 'bv.csv'
with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(["BV号","评论"]) # 写入表头
for bv in bv_list:
writer.writerow([bv])



print(f'bv号已经写入{csv_file}文件,内容为{bv_list}')


这个是b站评论爬虫,跟据之前的爬出的bv号将评论和点赞写入data.csv。
注意不要删除sleep函数不然可能会被b站安全风控封禁账号请求一段时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137

import requests
from datetime import datetime
import csv
import hashlib
import json
from urllib.parse import quote
import random
import time
from json.decoder import JSONDecodeError

BV2AV_API = 'https://api.bilibili.com/x/web-interface/view'
HEADER = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/80.0.3987.149 Safari/537.36'}
def bv_to_av(bv):
r = requests.get(BV2AV_API, {'bvid': bv}, headers=HEADER)
response = decode_json(r)
try:
return str(response['data']['aid'])
except (KeyError, TypeError):
return '获取av号失败'
def decode_json(r):
try:
response = r.json()
except JSONDecodeError:
# 虽然用的是requests的json方法,但要捕获的这个异常来自json模块
return -1
else:
return response
def GetSign(wts, NextPage):
"""w_rid加密参数获取"""
pagination_str = '{"offset": %s}' % NextPage
# 加密传入参数
l = [
"mode=2",
f"oid={av}",
f"pagination_str={quote(pagination_str)}",
"plat=1",
"type=1",
"web_location=1315875",
f"wts={wts}"
]
# 列表合并成字符串
y = '&'.join(l)
# 合并加密参数
string = y + 'ea1db124af3c7062474693fa704f4ff8'
# 使用md5加密
MD5 = hashlib.md5()
# 传入加密参数
MD5.update(string.encode('utf-8'))
# 进行加密处理
w_rid = MD5.hexdigest()
print(w_rid)
return w_rid, pagination_str


def GetContent(NextPage):
"""发送请求"""
# 模拟浏览器
headers = {
# user-agent 用户代理, 表示浏览器/设备基本身份信息
"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
# 请求网址
url = 'https://api.bilibili.com/x/v2/reply/wbi/main'
# 获取时间戳
wts = int(time.time())
# 获取w_rid加密参数
w_rid, pagination_str = GetSign(wts=wts, NextPage=NextPage)
print(pagination_str)
# 查询参数
data = {
'oid': av,
'type': '1',
'mode': '2',
'pagination_str': pagination_str,
'plat': '1',
'web_location': '1315875',
'w_rid': w_rid,
'wts': wts,
}
# 发送请求
response = requests.get(url=url, params=data, headers=headers)
"""获取数据"""
# 获取响应的json数据 -> 字典
json_data = response.json()

"""解析数据"""
# 第一次提取, 提取评论所在列表
replies = json_data['data']['replies']
# for循环遍历, 提取列表里面的元素
for index in replies:
"""提取具体每条评论信息内容"""
# 提取时间戳
ctime = index['ctime']
# 把时间戳转成日期
date = str(datetime.fromtimestamp(ctime))
dit = {
'点赞':index['like'],
'内容': index['content']['message'].replace('\n', ''),
}
# 写入数据
csv_writer.writerow(dit)
print(dit)
try:
next_offset = json_data['data']['cursor']['pagination_reply']['next_offset']
pagination_str = json.dumps(next_offset)
print(pagination_str)
return pagination_str
except (KeyError, TypeError):
print("已到达最后一页或数据结构异常")
return -1



f = open('data.csv', mode='w', encoding='utf-8-sig', newline='')
csv_writer = csv.DictWriter(f, fieldnames=[
'点赞',
'内容',
])
csv_writer.writeheader()
bv_list = []

with open('bv.csv', mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
bv = row['BV号']
print(f'正在采集BV号:{bv}的评论内容')

av = bv_to_av(bv)
NextPage = '""'
for page in range(1, 100):
print(f'正在采集第{page}页的数据内容')
NextPage = GetContent(NextPage=NextPage)
if NextPage == -1:
break
time.sleep(random.uniform(1, 5))

这是我的nlp模块用于判断评论情绪正负向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from snownlp import SnowNLP

import csv

texts=[]
def read_file():
with open('../data.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file)
next(reader)
for row in reader:
texts.append(row[1])



def emotion():
num =0
positive = 0
negative = 0
neutral = 0
for text in texts:
s = SnowNLP(text)
sentiment_score = s.sentiments
print(f"文本: {text}")
print(f"情感分数: {sentiment_score:.4f}")
# 转换为情感标签
sentiment_label = '积极' if sentiment_score > 0.6 else '消极' if sentiment_score < 0.4 else '中性'
print(f"情感倾向: {sentiment_label}")
num+=1
if sentiment_score > 0.6:
positive += 1
elif sentiment_score < 0.4:
negative += 1
else:
neutral += 1
return [positive,neutral,negative,num]
if __name__ == '__main__':
read_file()
print(f'分数为{emotion()}')
···
这是lda模块,可以进行主题分析后可视化
···lda
from bertopic import BERTopic
import jieba
import re
import csv
from sentence_transformers import SentenceTransformer

# 1. 数据清洗与预处理
raw_data = []
with open('../data.csv', mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
keywords = {
# 长词保留
'大模型', '生成式AI', 'chatgpt', 'deepseek', '人工智能',
'机器学习', '神经网络', '深度学习', '自然语言处理', '计算机视觉',

# 新增短词
'AI', 'GPT', 'LLM', 'CV', 'NLP', '模型', '算法', '训练', '推理', '参数', '调参',
'识别', '生成', '预测', '推荐', '智能', '自动', '替代', '就业'
}

if any(keyword in row['内容'] for keyword in keywords):
print(row)
raw_data.append(row)

print(raw_data)
embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")

def is_valid_text(text):
"""判断文本是否有效"""
# 常见无意义词
meaningless_keywords = {"三连", "666", "点赞", "关注", "转发", "支持一下", "一键三连"}
# 判断是否全是无意义词
words = set(jieba.cut(text))
if words.issubset(meaningless_keywords):
return False
# 或者判断长度过短
if len(text.strip()) < 5:
return False
return True
def preprocess_text(text):
"""清洗文本:移除表情符号、特 殊字符等"""
text = re.sub(r'\[.*?]', '', text) # 移除表情符号
text = re.sub(r'@\S+', '', text) # 移除@提及
text = re.sub(r'\s+', ' ', text).strip() # 清理空格
return text

# 提取有效评论内容
documents = []
for item in raw_data:
if isinstance(item, dict) and '内容' in item:
cleaned_text = preprocess_text(item['内容'])
if is_valid_text(cleaned_text): # 确保不为空
documents.append(cleaned_text)

print(f"提取到 {len(documents)} 条有效评论")


def chinese_tokenizer(text):
"""使用结巴分词进行中文分词"""
return list(jieba.cut(text))


model = BERTopic(language="chinese (simplified)",
nr_topics = 'auto',
embedding_model=embedding_model
)

# 5. 训练模型
topics, probs = model.fit_transform(documents)

# 6. 查看结果
topic_info = model.get_topic_info()
print("\n主题分布:")
print(topic_info)
def generate_topic_labels(model, top_n=3):
topic_labels = {}
for topic_id in range(model.get_topic_info().shape[0]):
if topic_id == -1:
continue # 忽略异常主题
keywords = model.get_topic(topic_id)
if keywords and len(keywords) >= top_n:
label = "_".join([word for word, _ in keywords[:top_n]])
topic_labels[topic_id] = label
else:
topic_labels[topic_id] = f"topic_{topic_id}"
return topic_labels

# 自动生成标签并设置
topic_labels = generate_topic_labels(model, top_n=3)
model.set_topic_labels(topic_labels)
# 7. 可视化
fig1 = model.visualize_topics(custom_labels=True) # 使用自定义标签
fig2 = model.visualize_barchart(top_n_topics=30, width=400, height=200)
fig1.show()
fig2.show()

最后感谢一下导师和伟大的互联网开源精神让我这个小白能零基础这么快做出来这个项目。