研究报告代码

研究报告代码

Administrator 6 2024-12-21

研究报告代码

问题1

import matplotlib.pyplot as plt
import numpy as np
from matplotlib import rcParams
import os
# 配置图标中文字体
rcParams['font.sans-serif'] = ['SimHei'] 
rcParams['axes.unicode_minus'] = False   

def myplot(Delta_list,update,visit,save_path=None):

    k = np.arange(0, len(Delta_list))
  
    plt.plot(k, Delta_list, marker='o') # 绘制折线图
    # 添加 "visit" 和 "update" 的竖线
    for t in update:
        plt.axvline(x=t, color="green", linestyle="--", label="update" if t == update[0] else "")
    for t in visit:
        plt.axvline(x=t, color="red", linestyle=":", label="visit" if t == visit[0] else "")

    # 设置中文标题和标签
    plt.title('问题一  示例图-1') 
    plt.xlabel('时隙k')  
    plt.ylabel(r'同步年龄 $\Delta_k$')     
    plt.legend()
    plt.grid(True)
    if save_path:
        script_dir = os.path.dirname(os.path.realpath(__file__))
        save_path = os.path.join(script_dir, save_path)
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.show()

def get_Delta(source_update,visit,k_max):
    Delta_list = []
    Delta_k = 0
    U_k = -1 # U_k 表示信息源在k时刻及以前最近一次发生状态变化的时刻
    V_k = -1 # V_k 表示无人机在k时刻及以前最近一次访问信息源的时刻
    for k in range(k_max):
        if k in source_update:
            U_k = k
        if k in visit:
            V_k = k

        if U_k <= V_k:
            Delta_k=0
        else:
            Delta_k+=1
        # print('当前时刻 {} | U_k {} | V_k {} | Delta_k {}'.format(k,U_k,V_k,Delta_k))
        Delta_list.append(Delta_k)
    return Delta_list

k_max = 30 # 本样例中总时长
source_update = [7,14,19] # 源更新时刻
visit = [5,10,26] # 无人机访问时隙

print('源更新发生时刻:',source_update)
print('无人机访问发生时刻:',visit)
Delta_list= get_Delta(source_update,visit,k_max)
save_path = 'Problem 1-1.png'
myplot(Delta_list,source_update,visit,save_path)

问题2

import pandas as pd
from collections import defaultdict
import numpy as np
import os

class DiscountedMDP:
    def __init__(self, p, q, w, alpha, C=1, init_max_k=50):
        """
        初始化MDP参数
        p: 从状态0转移到状态1的概率
        q: 从状态1转移到状态0的概率
        w: 同步年龄的权重
        alpha: 折扣因子
        C: 访问成本
        init_max_k: 初始最大时间间隔
        """
        self.p = p
        self.q = q
        self.w = w
        self.alpha = alpha
        self.C = C
        self.init_max_k = init_max_k

        # 状态空间: (t, s), t为时间间隔,s为上次访问时的源状态
        self.states = defaultdict(float)
        self.policy = defaultdict(int)
        for t in range(1, init_max_k+1):
            for s in [0, 1]:
                self.states[(t,s)] = 0
                self.policy[t] = 1

        # 动作空间: 0表示不访问,1表示访问
        self.actions = [0, 1]
      
    def get_transition_prob(self, n):
        """计算n步转移概率矩阵"""
        # 初始转移矩阵
        P = np.array([[1-self.p, self.p], 
                     [self.q, 1-self.q]])
        # 计算n步转移
        return np.linalg.matrix_power(P, n)
  
    def expected_AoS(self, t, s):
        """计算不访问时的期望同步年龄"""
        E_AoS = 0
        if s == 0: # 上一次访问时信息源状态为0
            for delta in range(t+1):
                # 分成两类
                if delta == 0:
                    prob = (1-self.p)**t
                else:
                    prob = (1-self.p)**(t-delta)*self.p
                E_AoS += delta * prob
        else: # 上一次访问时信息源状态为0
            for delta in range(t+1):
                # 分成三类
                if delta == 0:
                    prob = self.q*(1-self.p)**(t-1)
                elif delta < t:
                    prob = self.q*(1-self.p)**(t-delta-1)*self.p
                else:
                    prob = 1-self.q
                E_AoS += delta * prob
        return E_AoS

    def value_iteration(self, epsilon=1e-8):
        """值迭代算法"""
        max_k = self.init_max_k
        while True:
            state_old = self.states.copy() # 保存旧的状态-价值字典
            delta = 0 # 每一轮迭代最大差异值

            for state in self.states.keys(): # 遍历状态-价值字典
                t, s = state
                P_n = self.get_transition_prob(t) # 计算转移概率矩阵
                V_visit = self.C # 访问开销
                # 计算访问动作的值
                for s_next in [0, 1]:
                    V_visit += self.alpha * P_n[s, s_next] * state_old[(1, s_next)]
              
                # 计算不访问动作的值
                if t < max_k:
                    V_not_visit = self.w * self.expected_AoS(t+1, s) + self.alpha * state_old[(t+1, s)]
                else:
                    V_not_visit += 99999

                # 更新值函数和策略
                self.states[(t, s)] = min(V_visit, V_not_visit)
                self.policy[(t, s)] = 1 if V_visit < V_not_visit else 0

                delta = max(delta, abs(self.states[(t, s)] - state_old[(t, s)]))
          
            # 动态更新状态数,扩展或缩小阈值
            if delta < epsilon:
                if self.policy[(max_k-1,0)] == 1 and self.policy[(max_k-1,1)] == 1:
                    del self.states[(max_k,0)]
                    del self.states[(max_k,1)]
                    del self.policy[(max_k,0)]
                    del self.policy[(max_k,1)]
                    max_k = int(len(self.states)/2)
                elif self.policy[(max_k,0)] == 0 or self.policy[(max_k,1)] == 0:
                    # print(max_k,self.policy[(max_k,0)],self.policy[(max_k,1)])
                    self.states[(max_k+1,0)] = self.states[(max_k,0)]
                    self.states[(max_k+1,1)] = self.states[(max_k,1)]
                    self.policy[(max_k+1,0)] = 1
                    self.policy[(max_k+1,1)] = 1
                    max_k = int(len(self.states)/2)
                else:
                    break # 收敛

        return self.states, self.policy


if __name__ == '__main__':
    # 创建MDP实例
    p=0.1
    q=0.1
    w=0.1
    alpha=0.9
      
    print("-----------------------------begin-----------------------------")
    mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)

    # 运行值迭代算法
    V, policy = mdp.value_iteration()
    log_list = []
    print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
    # 输出最优策略
    for (t,s) in V.keys():
        print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")

        log_str = f"| State (t_k {t} s_k {s}) | {'Visit' if policy[(t,s)]==1 else 'Not Visit'} "
        log_list.append(log_str)

        print("------------------------------end------------------------------")

问题3

import pandas as pd
from collections import defaultdict
import numpy as np
import os

class AverageCostMDP:
    def __init__(self, p, q, w, C=1, init_max_k=30):
        """
        初始化MDP参数
        p: 从状态0转移到状态1的概率
        q: 从状态1转移到状态0的概率
        w: 同步年龄的权重
        C: 访问成本
        init_max_k: 初始最大时间间隔
        """
        self.p = p
        self.q = q
        self.w = w
        self.C = C
        self.init_max_k = init_max_k
        self.pi_0, self.pi_1 = self.get_stationary(self.p,self.q)

        # 状态-价值字典: (t, s)-V, t为时间间隔,s为上次访问时的源状态,v为value
        self.states = defaultdict(float)
        self.policy = defaultdict(int)
        for t in range(1, self.init_max_k+1):
            for s in [0, 1]:
                self.states[(t,s)] = 0
                self.policy[t] = 1
      
        # 动作空间: 0表示不访问,1表示访问
        self.actions = [0, 1]
 
    def get_stationary(self,p,q):
        """计算稳态分布"""
        return q/(p+q), p/(p+q)
  
    def get_transition_prob(self, n):
        """计算n步转移概率矩阵"""
        # 初始转移矩阵
        P = np.array([[1-self.p, self.p], 
                     [self.q, 1-self.q]])
        # 计算n步转移
        return np.linalg.matrix_power(P, n)
  
    def expected_AoS(self, t, s):
        """计算不访问时的期望同步年龄"""
        E_AoS = 0
        if s == 0:
            for delta in range(t+1):
                if delta == 0:
                    prob = (1-self.p)**t
                else:
                    prob = (1-self.p)**(t-delta)*self.p
                E_AoS += delta * prob
        else:
            for delta in range(t+1):
                if delta == 0:
                    prob = self.q*(1-self.p)**(t-1)
                elif delta < t:
                    prob = self.q*(1-self.p)**(t-delta-1)*self.p
                else:
                    prob = 1-self.q
                E_AoS += delta * prob
        return E_AoS

    def relative_value_iteration(self, epsilon=1e-6,max_iter = 1000):
        """相对值迭代算法"""
        # 初始化值函数和平均开销
        lambda_k = 0  # 平均开销
        max_k = self.init_max_k
        iter = 0
        while True:
            state_old = self.states.copy()
            lambda_k_old = lambda_k
            delta = 0

            #  固定状态(1,0)和(1,1),计算平均价值函数
            fix_state_value = []
            for state in [(1,0), (1,1)]:
                t, s = state
                P_n = self.get_transition_prob(t)
                # 计算访问动作的值
                V_visit = self.C
                for s_next in [0, 1]:
                    V_visit += P_n[s, s_next] * state_old[(1, s_next)]
                # 计算不访问动作的值
                V_not_visit = self.w * self.expected_AoS(t, s)
                V_not_visit += state_old[(t+1, s)]
                # 选择最小值作为新的值函数
                fix_state_value.append(min(V_visit, V_not_visit))
            lambda_k = self.pi_0*fix_state_value[0] + self.pi_1*fix_state_value[1]

            # 对每个状态进行更新
            for state in self.states.keys():
                t, s = state
                # 计算访问动作的值
                P_n = self.get_transition_prob(t)
                V_visit = self.C
                for s_next in [0, 1]:
                    V_visit += P_n[s, s_next] * state_old[(1, s_next)]
              
                # 计算不访问动作的值
                if t < max_k:
                    V_not_visit = self.w * self.expected_AoS(t+1, s) + state_old[(t+1, s)]
                else:
                    V_not_visit = 99999
              
                # 选择最小值作为新的值函数
                self.states[(t, s)] = min(V_visit, V_not_visit) - lambda_k
                self.policy[(t, s)] = 1 if V_visit < V_not_visit else 0
                delta = max(delta, abs(self.states[(t, s)] - state_old[(t, s)]))
          
            # 动态更新状态数,扩展或缩小阈值
            if (abs(lambda_k - lambda_k_old) < epsilon and delta < epsilon) or iter % max_iter == 0:
                if self.policy[(max_k-1,0)] == 1 and self.policy[(max_k-1,1)] == 1:
                    del self.states[(max_k,0)]
                    del self.states[(max_k,1)]
                    del self.policy[(max_k,0)]
                    del self.policy[(max_k,1)]
                elif self.policy[(max_k,0)] == 0 or self.policy[(max_k,1)] == 0:
                    self.states[(max_k+1,0)] = 0
                    self.states[(max_k+1,1)] = 0
                    self.policy[(max_k+1,0)] = 1
                    self.policy[(max_k+1,1)] = 1
                else:
                    break # 收敛
                max_k = int(len(self.states)/2)
            iter+=1

        return self.states, self.policy
  
if __name__ == '__main__':  
    # 创建MDP实例
    p=0.1
    q=0.1
    w=0.1

    print("-----------------------------begin-----------------------------")
    mdp = AverageCostMDP(p=p, q=q, w=w)

    # 运行值迭代算法
    V, policy = mdp.relative_value_iteration()
    log_list = []
    print("------------------------p={} q={} w={} -----------------".format(p,q,w))
    # 输出最优策略
    for (t,s) in V.keys():
        print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")

        log_str = f"| State (t_k {t} s_k {s}) | {'Visit' if policy[(t,s)]==1 else 'Not Visit'} "
        log_list.append(log_str)
  
    print("------------------------------end------------------------------")

问题4

问题4.1

from problem2 import DiscountedMDP 
import pandas as pd
from collections import defaultdict
import numpy as np
import os


if __name__ == '__main__':
    # 创建MDP实例
    p=0.1
    q=0.1
    w=0.1
    alpha=0.9
      
    print("-----------------------------begin-----------------------------")
    mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)

    # 运行值迭代算法
    V, policy = mdp.value_iteration()
    log_list = []
    print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
    # 输出最优策略
    for (t,s) in V.keys():
        print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")

        log_str = f"| State (t_k {t} s_k {s}) | {'Visit' if policy[(t,s)]==1 else 'Not Visit'} "
        log_list.append(log_str)

        print("------------------------------end------------------------------")
      

问题4.2

from problem2 import DiscountedMDP 
import os
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
import re
# 配置图标中文字体
rcParams['font.sans-serif'] = ['SimHei'] 
rcParams['axes.unicode_minus'] = False   


def compare_plot(ind):
    var_list = ['p','q','w','alpha']
    data_excel_list = ['compare_p.xlsx','compare_q.xlsx','compare_w.xlsx','compare_alpha.xlsx']
    save_path_list = ['compare_p.png','compare_q.png','compare_w.png','compare_alpha.png']
    folder_path = os.path.join(os.getcwd(), "discount_data")
    # 读取xlsx文件
    df = pd.read_excel(os.path.join(folder_path,data_excel_list[ind]))  

    # 创建空列表存储提取的数据
    var_values = []
    threshold_0_values = []
    threshold_1_values = []

    # 使用正则表达式提取数据
    for index, row in df.iterrows():
        # 提取var值
        var_match = re.search(r'Var\s+([\d.]+)', str(row[0]))
        if var_match:
            var_value = float(var_match.group(1))
      
        # 提取threshold值
        threshold_match = re.search(r'mini_threshold_(\d+)\s+(\d+)', str(row[0]))
        if threshold_match:
            threshold_num = int(threshold_match.group(1))
            threshold_value = int(threshold_match.group(2))
      
            if threshold_num == 0:
                var_values.append(var_value)
                threshold_0_values.append(threshold_value)
            elif threshold_num == 1:
                threshold_1_values.append(threshold_value)

    # 绘制折线图
    plt.figure(figsize=(10, 6))
    plt.plot(var_values, threshold_0_values, 'b-o', label='s_k = 0')
    plt.plot(var_values, threshold_1_values, 'r-o', label='s_k = 1')

    # 设置图表属性
    plt.xlabel('变量参数值')
    plt.ylabel('阈值(第一个最优策略为Visit的t_k)')
    plt.title('控制变量 {}'.format(var_list[ind]))
    plt.grid(True)
    plt.legend()

    # 保存图表
    script_dir = os.path.dirname(os.path.realpath(__file__))
    save_path = os.path.join(script_dir, 'discount_data',save_path_list[ind])
    plt.savefig(save_path, dpi=300, bbox_inches='tight')

    # 显示图表
    plt.show()
    print("picture have saved")

def run_variable(ind):
# 创建MDP实例
    p=0.1
    q=0.1
    w=0.1
    alpha=0.9

    variable = [0.01,0.02,0.03,0.05,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.95,0.97,0.98,0.99] # 用于控制变量,观察各个参数对于结果的影响
    folder_path = os.path.join(os.getcwd(), "discount_data")
    file_path_list = ['compare_p.xlsx','compare_q.xlsx','compare_w.xlsx','compare_alpha.xlsx']
    log_list = []
    # variable = [0.01]
    for v in variable:
        if ind == 0:
            p = v
        elif ind == 1:
            q = v
        elif ind == 2:
            w = v
        elif ind == 3:
            alpha = v
        else:
            print("error ind")
            break

        mini_threshold_0 = 99999
        mini_threshold_1 = 99999

        print("-----------------------------begin-----------------------------")
        mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)

        # 运行值迭代算法
        V, policy = mdp.value_iteration()
      
        print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
        # 输出最优策略
        for (t,s) in V.keys():
            print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
            if policy[(t,s)] == 1:
                if s == 0:
                    mini_threshold_0 = min(mini_threshold_0,t)
                elif s == 1:
                    mini_threshold_1 = min(mini_threshold_1,t)

        log_str0 = f"| Var {v} | mini_threshold_0 {mini_threshold_0} | 'Visit' "
        log_str1 = f"| Var {v} | mini_threshold_1 {mini_threshold_1} | 'Visit' "
        log_list.append(log_str0)
        log_list.append(log_str1)
        print("------------------------------end------------------------------")

    # 保存结果
    file_path = os.path.join(folder_path, file_path_list[ind])
    df = pd.DataFrame(log_list, columns=["Log"])
    df.to_excel(
        file_path,
        sheet_name="Sheet1",
        index=False,
    )
  

if __name__ == '__main__':
    """
    本问题需要生成不同参数对于最优策略阈值的影响
    最终会生成折线图,位于discount_data文件夹中,如果要复现请提前创建文件夹
    同时修改 ind变量 选取需要控制的变量,0-p  1-q  2-w  3-alpha
    """
    ind = 2 

    # 运行控制变量
    run_variable(ind)
  

    # 画比较图
    compare_plot(ind)

问题4.3

from problem2 import DiscountedMDP 
from problem3 import AverageCostMDP
import os
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
import re
# 配置图标中文字体
rcParams['font.sans-serif'] = ['SimHei'] 
rcParams['axes.unicode_minus'] = False   

def run_variable(p,q,w):
    # 创建MDP实例
    alpha=0.9

    variable = [0.01,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.95,0.97,0.98,0.99,0.999] # 用于控制变量,观察各个参数对于结果的影响
    log_list = []
    # variable = [0.01]
    for v in variable:
        alpha = v

        mini_threshold_0 = 99999
        mini_threshold_1 = 99999

        print("-----------------------------begin-----------------------------")
        mdp = DiscountedMDP(p=p, q=q, w=w, alpha=alpha)

        # 运行值迭代算法
        V, policy = mdp.value_iteration()
      
        print("-------------------p={} q={} w={} alpha={}-----------------".format(p,q,w,alpha))
        # 输出最优策略
        for (t,s) in V.keys():
            print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
            if policy[(t,s)] == 1:
                if s == 0:
                    mini_threshold_0 = min(mini_threshold_0,t)
                elif s == 1:
                    mini_threshold_1 = min(mini_threshold_1,t)

        log_str0 = f"| alpha {alpha} | mini_threshold_0 {mini_threshold_0} | 'Visit' "
        log_str1 = f"| alpha {alpha} | mini_threshold_1 {mini_threshold_1} | 'Visit' "
        log_list.append(log_str0)
        log_list.append(log_str1)
        print("------------------------------end------------------------------")

    return log_list

def avgcost(p,q,w):
    print("-----------------------------begin-----------------------------")
    mdp = AverageCostMDP(p=p, q=q, w=w)

    # 运行值迭代算法
    V, policy = mdp.relative_value_iteration()
    log_list = []
    print("------------------------p={} q={} w={} -----------------".format(p,q,w))
    mini_threshold_0 = 99999
    mini_threshold_1 = 99999
    # 输出最优策略
    for (t,s) in V.keys():
        print(f"State (t={t}, s={s}): {'Visit' if policy[(t,s)]==1 else 'Not Visit'}")
        if policy[(t,s)] == 1:
            if s == 0:
                mini_threshold_0 = min(mini_threshold_0,t)
            elif s == 1:
                mini_threshold_1 = min(mini_threshold_1,t)

    log_str0 = f"| alpha {1} | mini_threshold_0 {mini_threshold_0} | 'Visit' "
    log_str1 = f"| alpha {1} | mini_threshold_1 {mini_threshold_1} | 'Visit' "
    log_list.append(log_str0)
    log_list.append(log_str1)
    return log_list

def compare_plot(p,q,w):
    folder_path = os.path.join(os.getcwd(), "avgcost_data")  
    file_path = os.path.join(folder_path, 'p {} q {} w {}.xlsx'.format(p,q,w))
    # 读取xlsx文件
    df = pd.read_excel(file_path)  
    # 创建空列表存储提取的数据
    var_values = []
    threshold_0_values = []
    threshold_1_values = []

    # 使用正则表达式提取数据
    for index, row in df.iterrows():
        # 提取var值
        var_match = re.search(r'alpha\s+([\d.]+)', str(row[0]))
        if var_match:
            var_value = float(var_match.group(1))
      
        # 提取threshold值
        threshold_match = re.search(r'mini_threshold_(\d+)\s+(\d+)', str(row[0]))
        if threshold_match:
            threshold_num = int(threshold_match.group(1))
            threshold_value = int(threshold_match.group(2))
      
            if threshold_num == 0:
                var_values.append(var_value)
                threshold_0_values.append(threshold_value)
            elif threshold_num == 1:
                threshold_1_values.append(threshold_value)

    # 绘制折线图
    plt.figure(figsize=(10, 6))
    plt.plot(var_values[:-1], threshold_0_values[:-1], 'b-o', label='Disc s_k = 0')
    print(var_values[:-1], threshold_0_values[:-1])
    plt.plot(var_values[-1], threshold_0_values[-1], 'ro', color='green', label='Avg_cost s_k = 0')
    plt.plot(var_values[:-1], threshold_1_values[:-1], 'r-o', label='Disc s_k = 1')
    plt.plot(var_values[-1], threshold_1_values[-1], 'ro', color='yellow', label='Avg_cost s_k = 1')

    # 设置图表属性
    plt.xlabel('变量参数值')
    plt.ylabel('阈值(第一个最优策略为Visit的t_k)')
    plt.title('超参数 p-{} q-{} w-{}'.format(p,q,w))
    plt.grid(True)
    plt.legend()

    # 保存图表
    script_dir = os.path.join(os.getcwd(), "avgcost_data")  
    save_path = os.path.join(script_dir, 'p {} q {} w {}.png'.format(p,q,w))
    plt.savefig(save_path, dpi=300, bbox_inches='tight')

    # 显示图表
    plt.show()
    print("picture have saved")

if __name__ == '__main__':
    p = 0.2
    q = 0.2
    w = 0.05
  
    # 运行折扣问题 alpha逐渐接近1
    log_list1 = run_variable(p,q,w)
    # 运行平均开销问题
    log_list2 = avgcost(p,q,w)
    log_list = log_list1 + log_list2

    # 保存结果
    folder_path = os.path.join(os.getcwd(), "avgcost_data")  
    file_path = os.path.join(folder_path, 'p {} q {} w {}.xlsx'.format(p,q,w))
    df = pd.DataFrame(log_list, columns=["Log"])
    df.to_excel(
        file_path,
        sheet_name="Sheet1",
        index=False,
    )

    # 画比较图
    compare_plot(p,q,w)