Learn Python with Steem #10 #11 笔记


划重点

  • 获取用户的文章列表

    通过 steem.Steem.get_account_history 获取用户的动态,从中筛选出文章的 permlink。

    再用 steem.Steem.get_content 方法获取每篇文章的详细信息。

    其实也可以用 steem.blog.Blog 获取用户所有的文章信息。

编程练习

导入需要的Python包

import csv
import pymongo
from datetime import datetime, timezone, timedelta
from steem.blog import Blog
from pprint import pprint
import math

定义一个日期转换函数

def date_2_local_date(_utcdate, _timedelta):
    utc_date = _utcdate.replace(tzinfo=timezone.utc)
    return utc_date.astimezone(timezone(timedelta(hours=_timedelta)))

定义计算声望的函数

def parse_reputation(raw_reputation):
    return (math.log10(int(raw_reputation)) - 9) * 9 + 25

定义获取文章详情的函数,返回字典类型的数据

blob.jpg

定义获取用户所有文章的函数,返回字典

def get_user_posts(account_name='yjcps'):
    blog = Blog(account_name)
    all_data = blog.all()
    posts = [get_post_detail(post) for post in all_data]
    return {
        'account_name':account_name,
        'count_posts':len(posts),
        'posts':posts
    }

保存所有文章数据为CSV文件

def save_to_csv(data: list, file_path, encoding):
    keys = data[0].keys()
    with open(
            file_path, 'w', newline='', encoding=encoding,
            errors='ignore') as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        writer.writerows(data)

将数据导入到 MongoDB 数据库

# 配置数据库连接信息

MONGO_HOST = 'localhost'
MONGO_COLLECTION = 'steem'

mongo_client = pymongo.MongoClient(MONGO_HOST)
mongo_collection = mongo_client[MONGO_COLLECTION]
def save_to_mongodb(data, tablename):
    if data is not None:
        if isinstance(data, list):
            result = mongo_collection[tablename].insert_many(data)
            if result.acknowledged:
                print('已导入%d条数据' % len(result.inserted_ids))
                return result.inserted_ids
        if isinstance(data, dict):
            if mongo_collection[tablename].update_one({
                    'account_name': data.get('account_name')}, {
                    '$set': data}, upsert=True):
                print('已导入%s的数据' % data.get('account_name'))
                return True
    else:
        return None
yjcps = get_user_posts('yjcps')
# 查看返回的数据
pprint(yjcps)
{'account_name': 'yjcps',
 'count_posts': 35,
 'posts': [{'active': '2018-08-23 11:32:18',
            'active_voter': 'steempress,justyy,happyukgo,doraemon,superbing,dailystats,cryptocurrencyhk,jianan,steemtaker,cnbuddy,anxin,foodielifestyle,woolfe19861008,dailychina,yjcps,dongfengman,serenazz,shentrading,ethanlee,fanso,lilypang22,steempress-io,sweet-jenny8,shine.wong,turtlegraphics,regretfulwooden,witnesstools,happyfamily,ilovecoding',
            'author': 'yjcps',
            'author_reputation': 51.616714372034224,
            'body': '# Learn Python with Steem #09 笔记\n'
                    '\n'
                    '---\n'
                    '\n'
                    '\n'
                    '\n'
                    '## 划重点\n'
                    '\n'
                    '- 遍历字典\n'
                    '\n'
                    ' 利用dict.items()方法,用一个循环语句遍历整个字典的所有元素。\n'
                ...
                ...
                    "     'last_post_date': '2018-08-16-Thu 20:57:21',\n"
                    "     'post_count': 5926,\n"
                    "     'reputation': 67.26589131476406,\n"
                    "     'sbd_balance': '30.081 SBD',\n"
                    "     'sp': 5186.613554620994,\n"
                    "     'vesting_shares': 10501691.705077,\n"
                    "     'vot

limit_output extension: Maximum message size of 9996 exceeded with 57914 characters

file_path = '{}_all_posts_{}.csv'.format(yjcps['account_name'], yjcps['count_posts'])
# 保存数据为CSV文件,方便在Windows的Excel软件打开,选择编码为gbk,这样会丢弃不能编码的字符
# 可以选择utf-8编码,但需要从Excel中手动导入数据
# 如果日后编程需要取用,建议使用utf-8编码

save_to_csv(yjcps['posts'], file_path, 'gbk')

哈哈,我的所有文章数据,放到Excel里打开

图片.png

#保存到数据库
save_to_mongodb(yjcps, 'steem_posts')
已导入yjcps的数据





True

成功保存到数据库

图片.png

取大鹏的数据看看

dapeng = get_user_posts('dapeng')
file_path = '{}_all_posts_{}.csv'.format(dapeng['account_name'], dapeng['count_posts'])
save_to_csv(dapeng['posts'], file_path, 'gbk')
save_to_mongodb(dapeng, 'steem_posts')
已导入dapeng的数据





True

花了5分钟来获取大鹏的所有文章数据,一共497篇文章,难怪要等那么久,以后从数据库获取数据会快一点。
图片.png

再看看刘美女的文章

deanliu = get_user_posts('deanliu')
WARNING:root:Retry in 1s -- RPCErrorRecoverable: non-200 response: 502 from api.steemit.com
WARNING:root:Retry in 1s -- RPCErrorRecoverable: non-200 response: 502 from api.steemit.com
file_path = '{}_all_posts_{}.csv'.format(deanliu['account_name'], deanliu['count_posts'])
save_to_csv(deanliu['posts'], file_path, 'gbk')
save_to_mongodb(deanliu, 'steem_posts')
已导入deanliu的数据





True

获取刘美女的所有文章花了11分钟,一共878篇文章,高产作家!

图片.png

也成功保存了CSV文件。

图片.png

补充

为所有文章生成词云图片

导入需要的Python包

import numpy as np
import jieba.analyse
import PIL.Image as Image
import re
from matplotlib import pyplot as plt
from collections import Counter
from wordcloud import WordCloud, ImageColorGenerator

定义一个清理文章的函数,只保留中文

# 去除所有半角全角符号,只留字母、数字、中文。
def clean_text(text):
    # rule = re.compile(r"[^a-zA-Z0-9\u4e00-\u9fa5]")
    rule = re.compile(u"[^\u4e00-\u9fa5]")
    text = rule.sub(' ', text)
    return text

为文章分词,并统计每个关键词的频率

def get_tag(text, cnt):
    re_text = clean_text(text)
    tag_list = jieba.analyse.extract_tags(re_text)
    for tag in tag_list:
        cnt[tag] += 1
# 取出所有文章,放到一个列表里面
yjcps_all_post = [post['body'] for post in yjcps['posts']]

blob.jpg

# 获取词频
yjcps_post_counter = Counter()
for post in yjcps_all_post:
    get_tag(post, yjcps_post_counter)

blob.jpg

# 画词云图

# 指定中文字体
font = 'DroidSansFallbackFull.ttf'

wc = WordCloud(
    font_path=font,
    background_color="white",
    max_words=2000,
    max_font_size=100,
    width=800,
    height=800,
)

wc.generate_from_frequencies(yjcps_post_counter)

plt.figure(figsize=(8, 8))
plt.imshow(wc, interpolation="none")
plt.axis("off")
plt.show()

output_49_0.png

再来看看大鹏的文章关键词

dapeng_all_post = [post['body'] for post in dapeng['posts']]
# 获取词频
dapeng_post_counter = Counter()
for post in dapeng_all_post:
    get_tag(post, dapeng_post_counter)

# 画词云图

# 指定中文字体
font = 'DroidSansFallbackFull.ttf'
# 指定背景图片
bg_image = np.array(Image.open("steemit.png"))

wc = WordCloud(
    font_path=font,
    background_color="white",
    max_words=2000,
    max_font_size=100,
    mask=bg_image)

wc.generate_from_frequencies(dapeng_post_counter)

plt.figure(figsize=(8, 8))
plt.imshow(wc, interpolation="none")
plt.axis("off")
plt.show()

output_51_0.png

同样的,再看看刘美女的关键词

deanliu_all_post = [post['body'] for post in deanliu['posts']]
# 获取词频
deanliu_post_counter = Counter()
for post in deanliu_all_post:
    get_tag(post, deanliu_post_counter)

# 画词云图

# 指定中文字体
font = 'DroidSansFallbackFull.ttf'
# 指定背景图片
bg_image = np.array(Image.open("steemit2.png"))
# 取图片的颜色作为绘制词云图片的颜色
image_colors = ImageColorGenerator(bg_image)

wc = WordCloud(
    font_path=font,
    background_color="white",
    max_words=2000,
    max_font_size=20,
    mask=bg_image)

wc.generate_from_frequencies(deanliu_post_counter)

plt.figure(figsize=(8, 8))
plt.imshow(wc.recolor(color_func=image_colors), interpolation="none")
plt.axis("off")
plt.show()

output_53_0.png

绘制发帖时间图

导入Python包

from windrose import WindroseAxes
import numpy as np
from matplotlib import pyplot as plt
# 获取发帖时间,返回小时,获取的时间已处理为北京时间

def get_post_time(account:dict, _type:str='created')->list:
    posts = account['posts']
    time_format = '%Y-%m-%d %H:%M:%S'
    posts_time = [datetime.strptime(post[_type], time_format)  for post in posts]
    hours = [time.hour for time in posts_time]
    return hours
yjcps_post_time = get_post_time(yjcps)
dapeng_post_time = get_post_time(dapeng)
deanliu_post_time = get_post_time(deanliu)
# 我所有的发帖时间
print(yjcps_post_time)

[15, 6, 7, 6, 6, 6, 14, 7, 22, 7, 10, 0, 12, 20, 20, 19, 13, 0, 21, 18, 11, 11, 12, 1, 18, 15, 13, 12, 14, 11, 0, 0, 23, 18, 21, 15]

绘制我的发帖时间图

def make_time_image(time, colors=None, edgecolor='white', **kwargs):
    ws = np.array(time)
    wd = np.array(time) / 24 * 360

    # 统计各时间次数
    count, _ = np.histogram(time, bins=24)

    if colors:
        pass
    else:
        colors = [(94/255, 79/255, 162/255)]

    # 画图
    ax = WindroseAxes.from_ax()
    ax.bar(wd, ws, nsector=24, normed=True, colors=colors,
           bins=1, edgecolor=edgecolor, **kwargs)

    xticklabels = ['6:00', '3:00', '0:00',
                   '21:00', '18:00', '15:00', '12:00', '9:00']

    # 获取最大极径
    rmax = ax.get_rmax()
    fmt = "%.1f "
    yticklabels = np.linspace(0, rmax, 6) / rmax * max(count)
    yticklabels = [fmt % r for r in yticklabels[1:]]

    # 设置标签
    ax.set_yticklabels(yticklabels)
    ax.set_xticklabels(xticklabels)

    # 显示
    plt.show()
make_time_image(yjcps_post_time)

image.png

绘制大鹏的发帖时间图

make_time_image(dapeng_post_time)

image.png

最后绘制刘美女的发帖时间图

make_time_image(deanliu_post_time)

image.png

看到词云图和发帖时间图,大家是否想起了大鹏举办的 “猜猜 TA 是谁” 活动?

猴年马月终于到了, @dapeng @angelina6688 山寨版的用Python画这两个图的方法就在这里了。

期待下个猴年马月, @dapeng 写个正宗的用R画词云图和发帖时间图。


[DA series – Learn Python with Steem]

我的笔记: