import torch
from torch import nn
from torch.nn.init import xavier_uniform_
from torch.nn.init import constant_
import math
import torch.nn.functional as F
from enum import IntEnum
import numpy as np
from .que_base_model import QueBaseModel,QueEmb
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
[docs]class Dim(IntEnum):
batch = 0
seq = 1
feature = 2
[docs]class AKTQueNet(nn.Module):
def __init__(self, num_q, num_c, emb_size, n_blocks, dropout, d_ff=256,
kq_same=1, final_fc_dim=512, num_attn_heads=8, separate_qa=False, l2=1e-5, emb_type="qid", emb_path="", pretrain_dim=768):
super().__init__()
"""
Input:
d_model: dimension of attention block
final_fc_dim: dimension of final fully connected net before prediction
num_attn_heads: number of heads in multi-headed attention
d_ff : dimension for fully conntected net inside the basic block
kq_same: if key query same, kq_same=1, else = 0
"""
self.model_name = "akt"
self.num_c = num_c
self.dropout = dropout
self.kq_same = kq_same
self.num_q = num_q
self.l2 = l2
self.model_type = self.model_name
self.separate_qa = separate_qa
self.emb_type = emb_type
# embed_l = d_model
if self.num_q > 0:
self.difficult_param = nn.Embedding(self.num_q+1, 1) # 题目难度
self.q_embed_diff = nn.Embedding(self.num_q+1, emb_size) # question emb, 总结了包含当前question(concept)的problems(questions)的变化
self.qa_embed_diff = nn.Embedding(2 * self.num_q + 1, emb_size) # interaction emb, 同上
if self.separate_qa:
self.qa_embed = nn.Embedding(2*self.num_c+1, emb_size) # interaction emb
else: # false default
self.qa_embed = nn.Embedding(2, emb_size)
self.que_emb = QueEmb(num_q=num_q,num_c=num_c,emb_size=emb_size,emb_type=emb_type,device=device,
emb_path=emb_path,pretrain_dim=pretrain_dim)
# Architecture Object. It contains stack of attention block
self.model = Architecture(num_q=num_q, n_blocks=n_blocks, n_heads=num_attn_heads, dropout=dropout,
d_model=emb_size, d_feature=emb_size / num_attn_heads, d_ff=d_ff, kq_same=self.kq_same, model_type=self.model_type)
self.out = nn.Sequential(
nn.Linear(emb_size + emb_size,
final_fc_dim), nn.ReLU(), nn.Dropout(self.dropout),
nn.Linear(final_fc_dim, 256), nn.ReLU(
), nn.Dropout(self.dropout),
nn.Linear(256, 1)
)
self.reset()
[docs] def reset(self):
for p in self.parameters():
if p.size(0) == self.num_q+1 and self.num_q > 0:
torch.nn.init.constant_(p, 0.)
[docs] def base_emb(self, q, c, r):
q_embed_data = self.que_emb(q,c,r) # BS, seqlen, d_model# c_ct
if self.separate_qa:
qa_data = q + self.num_q * r
qa_embed_data = self.qa_embed(qa_data)
else:
# BS, seqlen, d_model # c_ct+ g_rt =e_(ct,rt)
qa_embed_data = self.qa_embed(r)+q_embed_data
return q_embed_data, qa_embed_data
# def forward(self, q_data, target, pid_data=None, qtest=False):
[docs] def forward(self, q, c, r):
# Batch First
q_embed_data, qa_embed_data = self.base_emb(q,c,r)
if self.num_q > 0: # have problem id
q_embed_diff_data = self.q_embed_diff(q) # d_ct 总结了包含当前question(concept)的problems(questions)的变化
pid_embed_data = self.difficult_param(q) # uq 当前problem的难度
q_embed_data = q_embed_data + pid_embed_data * \
q_embed_diff_data # uq *d_ct + c_ct # question encoder
qa_embed_diff_data = self.qa_embed_diff(
r) # f_(ct,rt) or #h_rt (qt, rt)差异向量
if self.separate_qa:
qa_embed_data = qa_embed_data + pid_embed_data * \
qa_embed_diff_data # uq* f_(ct,rt) + e_(ct,rt)
else:
qa_embed_data = qa_embed_data + pid_embed_data * \
(qa_embed_diff_data+q_embed_diff_data) # + uq *(h_rt+d_ct) # (q-response emb diff + question emb diff)
c_reg_loss = (pid_embed_data ** 2.).sum() * self.l2 # rasch部分loss
else:
c_reg_loss = 0.
# BS.seqlen,d_model
# Pass to the decoder
# output shape BS,seqlen,d_model or d_model//2
d_output = self.model(q_embed_data, qa_embed_data)
concat_q = torch.cat([d_output, q_embed_data], dim=-1)
output = self.out(concat_q).squeeze(-1)
m = nn.Sigmoid()
preds = m(output)
return preds, c_reg_loss
[docs]class AKTQue(QueBaseModel):
def __init__(self, num_q,num_c, emb_size,n_blocks=1, dropout=0.1, emb_type='qid',kq_same=1, final_fc_dim=512, num_attn_heads=8, separate_qa=False, l2=1e-5,d_ff=256,emb_path="", pretrain_dim=768,device='cpu',seed=0):
model_name = "dkt_que"
super().__init__(model_name=model_name,emb_type=emb_type,emb_path=emb_path,pretrain_dim=pretrain_dim,device=device,seed=seed)
self.model = AKTQueNet(num_q=num_q, num_c=num_c, emb_size=emb_size, n_blocks=n_blocks, dropout=dropout, d_ff=d_ff,
kq_same=kq_same, final_fc_dim=final_fc_dim, num_attn_heads=num_attn_heads, separate_qa=separate_qa,
l2=l2, emb_type=emb_type, emb_path=emb_path, pretrain_dim=pretrain_dim)
self.model = self.model.to(device)
[docs] def train_one_step(self,data):
y,reg_loss,data_new = self.predict_one_step(data,return_details=True)
loss = self.get_loss(y,data_new['rshft'],data_new['sm'])#get loss
print(f"reg_loss is {reg_loss}")
loss = loss+reg_loss
return y,loss
[docs] def predict_one_step(self,data,return_details=False):
data_new = self.batch_to_device(data)
# q, c, r, t, qshft, cshft, rshft, tshft, m, sm, cq, cc, cr, ct = self.batch_to_device(data)
y, reg_loss = self.model(data_new['cq'].long(),data_new['cc'].long(),data_new['cr'].long())
y = y[:,1:]
if return_details:
return y,reg_loss,data_new
else:
return y
[docs]class Architecture(nn.Module):
def __init__(self, num_q, n_blocks, d_model, d_feature,
d_ff, n_heads, dropout, kq_same, model_type):
super().__init__()
"""
n_block : number of stacked blocks in the attention
d_model : dimension of attention input/output
d_feature : dimension of input in each of the multi-head attention part.
n_head : number of heads. n_heads*d_feature = d_model
"""
self.d_model = d_model
self.model_type = model_type
if model_type in {'akt'}:
self.blocks_1 = nn.ModuleList([
TransformerLayer(d_model=d_model, d_feature=d_model // n_heads,
d_ff=d_ff, dropout=dropout, n_heads=n_heads, kq_same=kq_same)
for _ in range(n_blocks)
])
self.blocks_2 = nn.ModuleList([
TransformerLayer(d_model=d_model, d_feature=d_model // n_heads,
d_ff=d_ff, dropout=dropout, n_heads=n_heads, kq_same=kq_same)
for _ in range(n_blocks*2)
])
[docs] def forward(self, q_embed_data, qa_embed_data):
# target shape bs, seqlen
seqlen, batch_size = q_embed_data.size(1), q_embed_data.size(0)
qa_pos_embed = qa_embed_data
q_pos_embed = q_embed_data
y = qa_pos_embed
seqlen, batch_size = y.size(1), y.size(0)
x = q_pos_embed
# encoder
for block in self.blocks_1: # encode qas, 对0~t-1时刻前的qa信息进行编码
y = block(mask=1, query=y, key=y, values=y) # yt^
flag_first = True
for block in self.blocks_2:
if flag_first: # peek current question
x = block(mask=1, query=x, key=x,
values=x, apply_pos=False) # False: 没有FFN, 第一层只有self attention, 对应于xt^
flag_first = False
else: # dont peek current response
x = block(mask=0, query=x, key=x, values=y, apply_pos=True) # True: +FFN+残差+laynorm 非第一层与0~t-1的的q的attention, 对应图中Knowledge Retriever
# mask=0,不能看到当前的response, 在Knowledge Retrever的value全为0,因此,实现了第一题只有question信息,无qa信息的目的
# print(x[0,0,:])
flag_first = True
return x
[docs]class MultiHeadAttention(nn.Module):
def __init__(self, d_model, d_feature, n_heads, dropout, kq_same, bias=True):
super().__init__()
"""
It has projection layer for getting keys, queries and values. Followed by attention and a connected layer.
"""
self.d_model = d_model
self.d_k = d_feature
self.h = n_heads
self.kq_same = kq_same
self.v_linear = nn.Linear(d_model, d_model, bias=bias)
self.k_linear = nn.Linear(d_model, d_model, bias=bias)
if kq_same is False:
self.q_linear = nn.Linear(d_model, d_model, bias=bias)
self.dropout = nn.Dropout(dropout)
self.proj_bias = bias
self.out_proj = nn.Linear(d_model, d_model, bias=bias)
self.gammas = nn.Parameter(torch.zeros(n_heads, 1, 1))
torch.nn.init.xavier_uniform_(self.gammas)
self._reset_parameters()
def _reset_parameters(self):
xavier_uniform_(self.k_linear.weight)
xavier_uniform_(self.v_linear.weight)
if self.kq_same is False:
xavier_uniform_(self.q_linear.weight)
if self.proj_bias:
constant_(self.k_linear.bias, 0.)
constant_(self.v_linear.bias, 0.)
if self.kq_same is False:
constant_(self.q_linear.bias, 0.)
constant_(self.out_proj.bias, 0.)
[docs] def forward(self, q, k, v, mask, zero_pad):
bs = q.size(0)
# perform linear operation and split into h heads
k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
if self.kq_same is False:
q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
else:
q = self.k_linear(q).view(bs, -1, self.h, self.d_k)
v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
# transpose to get dimensions bs * h * sl * d_model
k = k.transpose(1, 2)
q = q.transpose(1, 2)
v = v.transpose(1, 2)
# calculate attention using function we will define next
gammas = self.gammas
scores = attention(q, k, v, self.d_k,
mask, self.dropout, zero_pad, gammas)
# concatenate heads and put through final linear layer
concat = scores.transpose(1, 2).contiguous()\
.view(bs, -1, self.d_model)
output = self.out_proj(concat)
return output
[docs]def attention(q, k, v, d_k, mask, dropout, zero_pad, gamma=None):
"""
This is called by Multi-head atention object to find the values.
"""
# d_k: 每一个头的dim
scores = torch.matmul(q, k.transpose(-2, -1)) / \
math.sqrt(d_k) # BS, 8, seqlen, seqlen
bs, head, seqlen = scores.size(0), scores.size(1), scores.size(2)
x1 = torch.arange(seqlen).expand(seqlen, -1).to(device)
x2 = x1.transpose(0, 1).contiguous()
with torch.no_grad():
scores_ = scores.masked_fill(mask == 0, -1e32)
scores_ = F.softmax(scores_, dim=-1) # BS,8,seqlen,seqlen
scores_ = scores_ * mask.float().to(device) # 结果和上一步一样
distcum_scores = torch.cumsum(scores_, dim=-1) # bs, 8, sl, sl
disttotal_scores = torch.sum(
scores_, dim=-1, keepdim=True) # bs, 8, sl, 1 全1
# print(f"distotal_scores: {disttotal_scores}")
position_effect = torch.abs(
x1-x2)[None, None, :, :].type(torch.FloatTensor).to(device) # 1, 1, seqlen, seqlen 位置差值
# bs, 8, sl, sl positive distance
dist_scores = torch.clamp(
(disttotal_scores-distcum_scores)*position_effect, min=0.) # score <0 时,设置为0
dist_scores = dist_scores.sqrt().detach()
m = nn.Softplus()
gamma = -1. * m(gamma).unsqueeze(0) # 1,8,1,1 一个头一个gamma参数, 对应论文里的theta
# Now after do exp(gamma*distance) and then clamp to 1e-5 to 1e5
total_effect = torch.clamp(torch.clamp(
(dist_scores*gamma).exp(), min=1e-5), max=1e5) # 对应论文公式1中的新增部分
scores = scores * total_effect
scores.masked_fill_(mask == 0, -1e32)
scores = F.softmax(scores, dim=-1) # BS,8,seqlen,seqlen
# print(f"before zero pad scores: {scores.shape}")
# print(zero_pad)
if zero_pad:
pad_zero = torch.zeros(bs, head, 1, seqlen).to(device)
scores = torch.cat([pad_zero, scores[:, :, 1:, :]], dim=2) # 第一行score置0
# print(f"after zero pad scores: {scores}")
scores = dropout(scores)
output = torch.matmul(scores, v)
# import sys
# sys.exit()
return output
[docs]class LearnablePositionalEmbedding(nn.Module):
def __init__(self, d_model, max_len=512):
super().__init__()
# Compute the positional encodings once in log space.
pe = 0.1 * torch.randn(max_len, d_model)
pe = pe.unsqueeze(0)
self.weight = nn.Parameter(pe, requires_grad=True)
[docs] def forward(self, x):
return self.weight[:, :x.size(Dim.seq), :] # ( 1,seq, Feature)
[docs]class CosinePositionalEmbedding(nn.Module):
def __init__(self, d_model, max_len=512):
super().__init__()
# Compute the positional encodings once in log space.
pe = 0.1 * torch.randn(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1).float()
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
-(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.weight = nn.Parameter(pe, requires_grad=False)
[docs] def forward(self, x):
return self.weight[:, :x.size(Dim.seq), :] # ( 1,seq, Feature)