研究报告代码
问题1
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import rcParams
import os
# 配置图标中文字体
rcParams['font.sans-serif'] = ['SimHei']
rcParams['axes.unicode_minus'] = False
def myplot(Delta_list,update,visit,save_path=None):
k = np.arange(0, len(Delta_list))
plt.plot(k, Delta_list, marker='o') # 绘制折线图
# 添加 "visit" 和 "update" 的竖线
for t in update:
plt.axvline(x=t, color="green", linestyle="--", label="update" if t == update[0] else "")
for t in visit:
plt.axvline(x=t, color="red", linestyle=":", label="visit" if t == visit[0] else "")
# 设置中文标题和标签
plt.title('问题一 示例图-1')
plt.xlabel('时隙k')
plt.ylabel(r'同步年龄 $\Delta_k$')
plt.legend()
plt.grid(True)
if save_path:
script_dir = os.path.dirname(os.path.realpath(__file__))
save_path = os.path.join(script_dir, save_path)
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def get_Delta(source_update,visit,k_max):
Delta_list = []
Delta_k = 0
U_k = -1 # U_k 表示信息源在k时刻及以前最近一次发生状态变化的时刻
V_k = -1 # V_k 表示无人机在k时刻及以前最近一次访问信息源的时刻
for k in range(k_max):
if k in source_update:
U_k = k
if k in visit:
V_k = k
if U_k <= V_k:
Delta_k=0
else:
Delta_k+=1
# print('当前时刻 {} | U_k {} | V_k {} | Delta_k {}'.format(k,U_k,V_k,Delta_k))
Delta_list.append(Delta_k)
return Delta_list
k_max = 30 # 本样例中总时长
source_update = [7,14,19] # 源更新时刻
visit = [5,10,26] # 无人机访问时隙
print('源更新发生时刻:',source_update)
print('无人机访问发生时刻:',visit)
Delta_list= get_Delta(source_update,visit,k_max)
save_path = 'Problem 1-1.png'
myplot(Delta_list,source_update,visit,save_path)
问题2
import pandas as pd
from collections import defaultdict
import numpy as np
import os
class DiscountedMDP:
def __init__(self, p, q, w, alpha, C=1, init_max_k=50):
"""
初始化MDP参数
p: 从状态0转移到状态1的概率
q: 从状态1转移到状态0的概率
w: 同步年龄的权重
alpha: 折扣因子
C: 访问成本
init_max_k: 初始最大时间间隔
"""
self.p = p
self.q = q
self.w = w
self.alpha = alpha
self.C = C
self.init_max_k = init_max_k
# 状态空间: (t, s), t为时间间隔,s为上次访问时的源状态
self.states = defaultdict(float)
self.policy = defaultdict(int)
for t in range(1, init_max_k+1):
for s in [0, 1]:
self.states[(t,s)] = 0
self.policy[t] = 1
# 动作空间: 0表示不访问,1表示访问
self.actions = [0, 1]
def get_transition_prob(self, n):
"""计算n步转移概率矩阵"""
# 初始转移矩阵
P = np.array([[1-self.p, self.p],
[self.q, 1-self.q]])
# 计算n步转移
return np.linalg.matrix_power(P, n)
def expected_AoS(self, t, s):
"""计算不访问时的期望同步年龄"""
E_AoS = 0
if s == 0: # 上一次访问时信息源状态为0
for delta in range(t+1):
# 分成两类
if delta == 0:
prob = (1-self.p)**t
else:
prob = (1-self.p)**(t-delta)*self.p
E_AoS += delta * prob
else: # 上一次访问时信息源状态为0
for delta in range(t+1):
# 分成三类
if delta == 0:
prob = self.q*(1-self.p)**(t-1)
elif delta < t:
prob = self.q*(1-self.p)**(t-delta-1)*self.p
else:
prob = 1-self.q
E_AoS += delta * prob
return E_AoS
def value_iteration(self, epsilon=1e-8):
"""值迭代算法"""
max_k = self.init_max_k
while True:
state_old = self.states.copy() # 保存旧的状态-价值字典
delta = 0 # 每一轮迭代最大差异值
for state in self.states.keys(): # 遍历状态-价值字典
t, s = state
P_n = self.get_transition_prob(t) # 计算转移概率矩阵
V_visit = self.C # 访问开销
# 计算访问动作的值
for s_next in [0, 1]:
V_visit += self.alpha * P_n[s, s_next] * state_old[(1, s_next)]
# 计算不访问动作的值
if t < max_k:
V_not_visit = self.w * self.expected_AoS(t+1, s) + self.alpha * state_old[(t+1, s)]
else:
V_not_visit += 99999
# 更新值函数和策略
self.states[(t, s)] = min(V_visit, V_not_visit)
self.policy[(t, s)] = 1 if V_visit < V_not_visit else 0
delta = max(delta, abs(self.states[(t, s)] - state_old[(t, s)]))
# 动态更新状态数,扩展或缩小阈值
if delta < epsilon:
if self.policy[(max_k-1,0)] == 1 and self.policy[(max_k-1,1)] == 1:
del self.states[(max_k,0)]
del self.states[(max_k,1)]
del self.policy[(max_k,0)]
del self.policy[(max_k,1)]
max_k = int(len(self.states)/2)
elif self.policy[(max_k,0)] == 0 or self.policy[(max_k,1)] == 0:
# print(max_k,self.policy[(max_k,0)],self.policy[(max_k,1)])
self.states[(max_k+1,0)] = self.states[(max_k,0)]
self.states[(max_k+1,1)] = self.states[(max_k,1)]
self.policy[(max_k+1,0)] = 1
self.policy[(max_k+1,1)] = 1
max_k = int(len(self.states)/2)
else:
break # 收敛
return self.states, self.policy
if __name__ == '__main__':
# 创建MDP实例
p=0.1
q=0.1
w=0.1
alpha=0.9
print("-----------------------------begin-----------------------------")
mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)
# 运行值迭代算法
V, policy = mdp.value_iteration()
log_list = []
print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
# 输出最优策略
for (t,s) in V.keys():
print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
log_str = f"| State (t_k {t} s_k {s}) | {'Visit' if policy[(t,s)]==1 else 'Not Visit'} "
log_list.append(log_str)
print("------------------------------end------------------------------")
问题3
import pandas as pd
from collections import defaultdict
import numpy as np
import os
class AverageCostMDP:
def __init__(self, p, q, w, C=1, init_max_k=30):
"""
初始化MDP参数
p: 从状态0转移到状态1的概率
q: 从状态1转移到状态0的概率
w: 同步年龄的权重
C: 访问成本
init_max_k: 初始最大时间间隔
"""
self.p = p
self.q = q
self.w = w
self.C = C
self.init_max_k = init_max_k
self.pi_0, self.pi_1 = self.get_stationary(self.p,self.q)
# 状态-价值字典: (t, s)-V, t为时间间隔,s为上次访问时的源状态,v为value
self.states = defaultdict(float)
self.policy = defaultdict(int)
for t in range(1, self.init_max_k+1):
for s in [0, 1]:
self.states[(t,s)] = 0
self.policy[t] = 1
# 动作空间: 0表示不访问,1表示访问
self.actions = [0, 1]
def get_stationary(self,p,q):
"""计算稳态分布"""
return q/(p+q), p/(p+q)
def get_transition_prob(self, n):
"""计算n步转移概率矩阵"""
# 初始转移矩阵
P = np.array([[1-self.p, self.p],
[self.q, 1-self.q]])
# 计算n步转移
return np.linalg.matrix_power(P, n)
def expected_AoS(self, t, s):
"""计算不访问时的期望同步年龄"""
E_AoS = 0
if s == 0:
for delta in range(t+1):
if delta == 0:
prob = (1-self.p)**t
else:
prob = (1-self.p)**(t-delta)*self.p
E_AoS += delta * prob
else:
for delta in range(t+1):
if delta == 0:
prob = self.q*(1-self.p)**(t-1)
elif delta < t:
prob = self.q*(1-self.p)**(t-delta-1)*self.p
else:
prob = 1-self.q
E_AoS += delta * prob
return E_AoS
def relative_value_iteration(self, epsilon=1e-6,max_iter = 1000):
"""相对值迭代算法"""
# 初始化值函数和平均开销
lambda_k = 0 # 平均开销
max_k = self.init_max_k
iter = 0
while True:
state_old = self.states.copy()
lambda_k_old = lambda_k
delta = 0
# 固定状态(1,0)和(1,1),计算平均价值函数
fix_state_value = []
for state in [(1,0), (1,1)]:
t, s = state
P_n = self.get_transition_prob(t)
# 计算访问动作的值
V_visit = self.C
for s_next in [0, 1]:
V_visit += P_n[s, s_next] * state_old[(1, s_next)]
# 计算不访问动作的值
V_not_visit = self.w * self.expected_AoS(t, s)
V_not_visit += state_old[(t+1, s)]
# 选择最小值作为新的值函数
fix_state_value.append(min(V_visit, V_not_visit))
lambda_k = self.pi_0*fix_state_value[0] + self.pi_1*fix_state_value[1]
# 对每个状态进行更新
for state in self.states.keys():
t, s = state
# 计算访问动作的值
P_n = self.get_transition_prob(t)
V_visit = self.C
for s_next in [0, 1]:
V_visit += P_n[s, s_next] * state_old[(1, s_next)]
# 计算不访问动作的值
if t < max_k:
V_not_visit = self.w * self.expected_AoS(t+1, s) + state_old[(t+1, s)]
else:
V_not_visit = 99999
# 选择最小值作为新的值函数
self.states[(t, s)] = min(V_visit, V_not_visit) - lambda_k
self.policy[(t, s)] = 1 if V_visit < V_not_visit else 0
delta = max(delta, abs(self.states[(t, s)] - state_old[(t, s)]))
# 动态更新状态数,扩展或缩小阈值
if (abs(lambda_k - lambda_k_old) < epsilon and delta < epsilon) or iter % max_iter == 0:
if self.policy[(max_k-1,0)] == 1 and self.policy[(max_k-1,1)] == 1:
del self.states[(max_k,0)]
del self.states[(max_k,1)]
del self.policy[(max_k,0)]
del self.policy[(max_k,1)]
elif self.policy[(max_k,0)] == 0 or self.policy[(max_k,1)] == 0:
self.states[(max_k+1,0)] = 0
self.states[(max_k+1,1)] = 0
self.policy[(max_k+1,0)] = 1
self.policy[(max_k+1,1)] = 1
else:
break # 收敛
max_k = int(len(self.states)/2)
iter+=1
return self.states, self.policy
if __name__ == '__main__':
# 创建MDP实例
p=0.1
q=0.1
w=0.1
print("-----------------------------begin-----------------------------")
mdp = AverageCostMDP(p=p, q=q, w=w)
# 运行值迭代算法
V, policy = mdp.relative_value_iteration()
log_list = []
print("------------------------p={} q={} w={} -----------------".format(p,q,w))
# 输出最优策略
for (t,s) in V.keys():
print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
log_str = f"| State (t_k {t} s_k {s}) | {'Visit' if policy[(t,s)]==1 else 'Not Visit'} "
log_list.append(log_str)
print("------------------------------end------------------------------")
问题4
问题4.1
from problem2 import DiscountedMDP
import pandas as pd
from collections import defaultdict
import numpy as np
import os
if __name__ == '__main__':
# 创建MDP实例
p=0.1
q=0.1
w=0.1
alpha=0.9
print("-----------------------------begin-----------------------------")
mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)
# 运行值迭代算法
V, policy = mdp.value_iteration()
log_list = []
print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
# 输出最优策略
for (t,s) in V.keys():
print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
log_str = f"| State (t_k {t} s_k {s}) | {'Visit' if policy[(t,s)]==1 else 'Not Visit'} "
log_list.append(log_str)
print("------------------------------end------------------------------")
问题4.2
from problem2 import DiscountedMDP
import os
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
import re
# 配置图标中文字体
rcParams['font.sans-serif'] = ['SimHei']
rcParams['axes.unicode_minus'] = False
def compare_plot(ind):
var_list = ['p','q','w','alpha']
data_excel_list = ['compare_p.xlsx','compare_q.xlsx','compare_w.xlsx','compare_alpha.xlsx']
save_path_list = ['compare_p.png','compare_q.png','compare_w.png','compare_alpha.png']
folder_path = os.path.join(os.getcwd(), "discount_data")
# 读取xlsx文件
df = pd.read_excel(os.path.join(folder_path,data_excel_list[ind]))
# 创建空列表存储提取的数据
var_values = []
threshold_0_values = []
threshold_1_values = []
# 使用正则表达式提取数据
for index, row in df.iterrows():
# 提取var值
var_match = re.search(r'Var\s+([\d.]+)', str(row[0]))
if var_match:
var_value = float(var_match.group(1))
# 提取threshold值
threshold_match = re.search(r'mini_threshold_(\d+)\s+(\d+)', str(row[0]))
if threshold_match:
threshold_num = int(threshold_match.group(1))
threshold_value = int(threshold_match.group(2))
if threshold_num == 0:
var_values.append(var_value)
threshold_0_values.append(threshold_value)
elif threshold_num == 1:
threshold_1_values.append(threshold_value)
# 绘制折线图
plt.figure(figsize=(10, 6))
plt.plot(var_values, threshold_0_values, 'b-o', label='s_k = 0')
plt.plot(var_values, threshold_1_values, 'r-o', label='s_k = 1')
# 设置图表属性
plt.xlabel('变量参数值')
plt.ylabel('阈值(第一个最优策略为Visit的t_k)')
plt.title('控制变量 {}'.format(var_list[ind]))
plt.grid(True)
plt.legend()
# 保存图表
script_dir = os.path.dirname(os.path.realpath(__file__))
save_path = os.path.join(script_dir, 'discount_data',save_path_list[ind])
plt.savefig(save_path, dpi=300, bbox_inches='tight')
# 显示图表
plt.show()
print("picture have saved")
def run_variable(ind):
# 创建MDP实例
p=0.1
q=0.1
w=0.1
alpha=0.9
variable = [0.01,0.02,0.03,0.05,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.95,0.97,0.98,0.99] # 用于控制变量,观察各个参数对于结果的影响
folder_path = os.path.join(os.getcwd(), "discount_data")
file_path_list = ['compare_p.xlsx','compare_q.xlsx','compare_w.xlsx','compare_alpha.xlsx']
log_list = []
# variable = [0.01]
for v in variable:
if ind == 0:
p = v
elif ind == 1:
q = v
elif ind == 2:
w = v
elif ind == 3:
alpha = v
else:
print("error ind")
break
mini_threshold_0 = 99999
mini_threshold_1 = 99999
print("-----------------------------begin-----------------------------")
mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)
# 运行值迭代算法
V, policy = mdp.value_iteration()
print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
# 输出最优策略
for (t,s) in V.keys():
print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
if policy[(t,s)] == 1:
if s == 0:
mini_threshold_0 = min(mini_threshold_0,t)
elif s == 1:
mini_threshold_1 = min(mini_threshold_1,t)
log_str0 = f"| Var {v} | mini_threshold_0 {mini_threshold_0} | 'Visit' "
log_str1 = f"| Var {v} | mini_threshold_1 {mini_threshold_1} | 'Visit' "
log_list.append(log_str0)
log_list.append(log_str1)
print("------------------------------end------------------------------")
# 保存结果
file_path = os.path.join(folder_path, file_path_list[ind])
df = pd.DataFrame(log_list, columns=["Log"])
df.to_excel(
file_path,
sheet_name="Sheet1",
index=False,
)
if __name__ == '__main__':
"""
本问题需要生成不同参数对于最优策略阈值的影响
最终会生成折线图,位于discount_data文件夹中,如果要复现请提前创建文件夹
同时修改 ind变量 选取需要控制的变量,0-p 1-q 2-w 3-alpha
"""
ind = 2
# 运行控制变量
run_variable(ind)
# 画比较图
compare_plot(ind)
问题4.3
from problem2 import DiscountedMDP
from problem3 import AverageCostMDP
import os
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
import re
# 配置图标中文字体
rcParams['font.sans-serif'] = ['SimHei']
rcParams['axes.unicode_minus'] = False
def run_variable(p,q,w):
# 创建MDP实例
alpha=0.9
variable = [0.01,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.95,0.97,0.98,0.99,0.999] # 用于控制变量,观察各个参数对于结果的影响
log_list = []
# variable = [0.01]
for v in variable:
alpha = v
mini_threshold_0 = 99999
mini_threshold_1 = 99999
print("-----------------------------begin-----------------------------")
mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)
# 运行值迭代算法
V, policy = mdp.value_iteration()
print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
# 输出最优策略
for (t,s) in V.keys():
print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
if policy[(t,s)] == 1:
if s == 0:
mini_threshold_0 = min(mini_threshold_0,t)
elif s == 1:
mini_threshold_1 = min(mini_threshold_1,t)
log_str0 = f"| alpha {alpha} | mini_threshold_0 {mini_threshold_0} | 'Visit' "
log_str1 = f"| alpha {alpha} | mini_threshold_1 {mini_threshold_1} | 'Visit' "
log_list.append(log_str0)
log_list.append(log_str1)
print("------------------------------end------------------------------")
return log_list
def avgcost(p,q,w):
print("-----------------------------begin-----------------------------")
mdp = AverageCostMDP(p=p, q=q, w=w)
# 运行值迭代算法
V, policy = mdp.relative_value_iteration()
log_list = []
print("------------------------p={} q={} w={} -----------------".format(p,q,w))
mini_threshold_0 = 99999
mini_threshold_1 = 99999
# 输出最优策略
for (t,s) in V.keys():
print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
if policy[(t,s)] == 1:
if s == 0:
mini_threshold_0 = min(mini_threshold_0,t)
elif s == 1:
mini_threshold_1 = min(mini_threshold_1,t)
log_str0 = f"| alpha {1} | mini_threshold_0 {mini_threshold_0} | 'Visit' "
log_str1 = f"| alpha {1} | mini_threshold_1 {mini_threshold_1} | 'Visit' "
log_list.append(log_str0)
log_list.append(log_str1)
return log_list
def compare_plot(p,q,w):
folder_path = os.path.join(os.getcwd(), "avgcost_data")
file_path = os.path.join(folder_path, 'p {} q {} w {}.xlsx'.format(p,q,w))
# 读取xlsx文件
df = pd.read_excel(file_path)
# 创建空列表存储提取的数据
var_values = []
threshold_0_values = []
threshold_1_values = []
# 使用正则表达式提取数据
for index, row in df.iterrows():
# 提取var值
var_match = re.search(r'alpha\s+([\d.]+)', str(row[0]))
if var_match:
var_value = float(var_match.group(1))
# 提取threshold值
threshold_match = re.search(r'mini_threshold_(\d+)\s+(\d+)', str(row[0]))
if threshold_match:
threshold_num = int(threshold_match.group(1))
threshold_value = int(threshold_match.group(2))
if threshold_num == 0:
var_values.append(var_value)
threshold_0_values.append(threshold_value)
elif threshold_num == 1:
threshold_1_values.append(threshold_value)
# 绘制折线图
plt.figure(figsize=(10, 6))
plt.plot(var_values[:-1], threshold_0_values[:-1], 'b-o', label='Disc s_k = 0')
print(var_values[:-1], threshold_0_values[:-1])
plt.plot(var_values[-1], threshold_0_values[-1], 'ro', color='green', label='Avg_cost s_k = 0')
plt.plot(var_values[:-1], threshold_1_values[:-1], 'r-o', label='Disc s_k = 1')
plt.plot(var_values[-1], threshold_1_values[-1], 'ro', color='yellow', label='Avg_cost s_k = 1')
# 设置图表属性
plt.xlabel('变量参数值')
plt.ylabel('阈值(第一个最优策略为Visit的t_k)')
plt.title('超参数 p-{} q-{} w-{}'.format(p,q,w))
plt.grid(True)
plt.legend()
# 保存图表
script_dir = os.path.join(os.getcwd(), "avgcost_data")
save_path = os.path.join(script_dir, 'p {} q {} w {}.png'.format(p,q,w))
plt.savefig(save_path, dpi=300, bbox_inches='tight')
# 显示图表
plt.show()
print("picture have saved")
if __name__ == '__main__':
p = 0.2
q = 0.2
w = 0.05
# 运行折扣问题 alpha逐渐接近1
log_list1 = run_variable(p,q,w)
# 运行平均开销问题
log_list2 = avgcost(p,q,w)
log_list = log_list1 + log_list2
# 保存结果
folder_path = os.path.join(os.getcwd(), "avgcost_data")
file_path = os.path.join(folder_path, 'p {} q {} w {}.xlsx'.format(p,q,w))
df = pd.DataFrame(log_list, columns=["Log"])
df.to_excel(
file_path,
sheet_name="Sheet1",
index=False,
)
# 画比较图
compare_plot(p,q,w)