Source code for pykt.datasets.dkt_forget_dataloader

from ast import Assert
import os, sys
from re import L
import pandas as pd
import torch
from torch.utils.data import Dataset
from torch.cuda import FloatTensor, LongTensor
import numpy as np

ModelConf = {
    "dkt_forget": ["timestamps"]
}

[docs]class DktForgetDataset(Dataset): """Dataset for dkt_forget can use to init dataset for: dkt_forget train data, valid data common test data(concept level evaluation), real educational scenario test data(question level evaluation). Args: file_path (str): train_valid/test file path input_type (list[str]): the input type of the dataset, values are in ["questions", "concepts"] folds (set(int)): the folds used to generate dataset, -1 for test data qtest (bool, optional): is question evaluation or not. Defaults to False. """ def __init__(self, file_path, input_type, folds, qtest=False): super(DktForgetDataset, self).__init__() self.sequence_path = file_path self.input_type = input_type self.qtest = qtest folds = list(folds) folds_str = "_" + "_".join([str(_) for _ in folds]) if self.qtest: processed_data = file_path + folds_str + "_dkt_forget_qtest.pkl" else: processed_data = file_path + folds_str + "_dkt_forget.pkl" if not os.path.exists(processed_data): print(f"Start preprocessing {file_path} fold: {folds_str}...") if self.qtest: self.dori, self.dgaps, self.max_rgap, self.max_sgap, self.max_pcount, self.dqtest = \ self.__load_data__(self.sequence_path, folds) save_data = [self.dori, self.dgaps, self.max_rgap, self.max_sgap, self.max_pcount, self.dqtest] else: self.dori, self.dgaps, self.max_rgap, self.max_sgap, self.max_pcount = \ self.__load_data__(self.sequence_path, folds) save_data = [self.dori, self.dgaps, self.max_rgap, self.max_sgap, self.max_pcount] pd.to_pickle(save_data, processed_data) else: print(f"Read data from processed file: {processed_data}") if self.qtest: self.dori, self.dgaps, self.max_rgap, self.max_sgap, self.max_pcount, self.dqtest = pd.read_pickle(processed_data) else: self.dori, self.dgaps, self.max_rgap, self.max_sgap, self.max_pcount = pd.read_pickle(processed_data) print(f"file path: {file_path}, qlen: {len(self.dori['qseqs'])}, clen: {len(self.dori['cseqs'])}, rlen: {len(self.dori['rseqs'])}, \ max_rgap: {self.max_rgap}, max_sgap: {self.max_sgap}, max_pcount: {self.max_pcount}") def __len__(self): """return the dataset length Returns: int: the length of the dataset """ return len(self.dori['rseqs']) def __getitem__(self, index): """ Args: index (int): the index of the data want to get Returns: (tuple): tuple containing: - ** q_seqs (torch.tensor)**: question id sequence of the 0~seqlen-2 interactions - **c_seqs (torch.tensor)**: knowledge concept id sequence of the 0~seqlen-2 interactions - **r_seqs (torch.tensor)**: response id sequence of the 0~seqlen-2 interactions - **qshft_seqs (torch.tensor)**: question id sequence of the 1~seqlen-1 interactions - **cshft_seqs (torch.tensor)**: knowledge concept id sequence of the 1~seqlen-1 interactions - **rshft_seqs (torch.tensor)**: response id sequence of the 1~seqlen-1 interactions - **mask_seqs (torch.tensor)**: masked value sequence, shape is seqlen-1 - **select_masks (torch.tensor)**: is select to calculate the performance or not, 0 is not selected, 1 is selected, only available for 1~seqlen-1, shape is seqlen-1 - **dcur (dict)**: used only self.qtest is True, for question level evaluation """ # q_seqs, qshft_seqs, c_seqs, cshft_seqs = torch.tensor([]), torch.tensor([]), torch.tensor([]), torch.tensor([]) dcur = dict() mseqs = self.dori["masks"][index] for key in self.dori: if key in ["masks", "smasks"]: continue if len(self.dori[key]) == 0: dcur[key] = self.dori[key] dcur["shft_"+key] = self.dori[key] continue # print(f"key: {key}, len: {len(self.dori[key])}") seqs = self.dori[key][index][:-1] * mseqs shft_seqs = self.dori[key][index][1:] * mseqs dcur[key] = seqs dcur["shft_"+key] = shft_seqs dcur["masks"] = mseqs dcur["smasks"] = self.dori["smasks"][index] dcurgaps = dict() for key in self.dgaps: seqs = self.dgaps[key][index][:-1] * mseqs shft_seqs = self.dgaps[key][index][1:] * mseqs dcurgaps[key] = seqs dcurgaps["shft_"+key] = shft_seqs if not self.qtest: return dcur, dcurgaps else: dqtest = dict() for key in self.dqtest: dqtest[key] = self.dqtest[key][index] return dcur, dcurgaps, dqtest def __load_data__(self, sequence_path, folds, pad_val=-1): """ Args: sequence_path (str): file path of the sequences folds (list[int]): pad_val (int, optional): pad value. Defaults to -1. Returns: (tuple): tuple containing: - **q_seqs (torch.tensor)**: question id sequence of the 0~seqlen-1 interactions - **c_seqs (torch.tensor)**: knowledge concept id sequence of the 0~seqlen-1 interactions - **r_seqs (torch.tensor)**: response id sequence of the 0~seqlen-1 interactions - **mask_seqs (torch.tensor)**: masked value sequence, shape is seqlen-1 - **select_masks (torch.tensor)**: is select to calculate the performance or not, 0 is not selected, 1 is selected, only available for 1~seqlen-1, shape is seqlen-1 - **max_rgap (int)**: max num of the repeated time gap - **max_sgap (int)**: max num of the sequence time gap - **max_pcount (int)**: max num of the past exercise counts - **dqtest (dict)**: not null only self.qtest is True, for question level evaluation """ dori = {"qseqs": [], "cseqs": [], "rseqs": [], "tseqs": [], "utseqs": [], "smasks": []} # seq_qids, seq_cids, seq_rights, seq_mask = [], [], [], [] # repeated_gap, sequence_gap, past_counts = [], [], [] dgaps = {"rgaps": [], "sgaps": [], "pcounts": []} max_rgap, max_sgap, max_pcount = 0, 0, 0 df = pd.read_csv(sequence_path) df = df[df["fold"].isin(folds)] dqtest = {"qidxs": [], "rests":[], "orirow":[]} flag = True for key in ModelConf["dkt_forget"]: if key not in df.columns: print(f"key: {key} not in data: {self.sequence_path}! can not run dkt_forget model!") flag = False assert flag == True for i, row in df.iterrows(): #use kc_id or question_id as input if "concepts" in self.input_type: dori["cseqs"].append([int(_) for _ in row["concepts"].split(",")]) if "questions" in self.input_type: dori["qseqs"].append([int(_) for _ in row["questions"].split(",")]) if "timestamps" in row: dori["tseqs"].append([int(_) for _ in row["timestamps"].split(",")]) if "usetimes" in row: dori["utseqs"].append([int(_) for _ in row["usetimes"].split(",")]) dori["rseqs"].append([int(_) for _ in row["responses"].split(",")]) dori["smasks"].append([int(_) for _ in row["selectmasks"].split(",")]) rgap, sgap, pcount = self.calC(row) dgaps["rgaps"].append(rgap) dgaps["sgaps"].append(sgap) dgaps["pcounts"].append(pcount) max_rgap = max(rgap) if max(rgap) > max_rgap else max_rgap max_sgap = max(sgap) if max(sgap) > max_sgap else max_sgap max_pcount = max(pcount) if max(pcount) > max_pcount else max_pcount if self.qtest: dqtest["qidxs"].append([int(_) for _ in row["qidxs"].split(",")]) dqtest["rests"].append([int(_) for _ in row["rest"].split(",")]) dqtest["orirow"].append([int(_) for _ in row["orirow"].split(",")]) for key in dori: if key not in ["rseqs"]:#in ["smasks", "tseqs"]: dori[key] = LongTensor(dori[key]) else: dori[key] = FloatTensor(dori[key]) mask_seqs = (dori["cseqs"][:,:-1] != pad_val) * (dori["cseqs"][:,1:] != pad_val) dori["masks"] = mask_seqs dori["smasks"] = (dori["smasks"][:, 1:] != pad_val) # q_seqs, c_seqs, r_seqs = FloatTensor(seq_qids), FloatTensor(seq_cids), FloatTensor(seq_rights) for key in dgaps: dgaps[key] = LongTensor(dgaps[key]) if self.qtest: for key in dqtest: dqtest[key] = LongTensor(dqtest[key])[:, 1:] return dori, dgaps, max_rgap, max_sgap, max_pcount, dqtest return dori, dgaps, max_rgap, max_sgap, max_pcount
[docs] def log2(self, t): import math return round(math.log(t+1, 2))
[docs] def calC(self, row): repeated_gap, sequence_gap, past_counts = [], [], [] uid = row["uid"] # default: concepts skills = row["concepts"].split(",") if "concepts" in self.input_type else row["questions"].split(",") timestamps = row["timestamps"].split(",") dlastskill, dcount = dict(), dict() pret = None for s, t in zip(skills, timestamps): s, t = int(s), int(t) if s not in dlastskill or s == -1: curRepeatedGap = 0 else: curRepeatedGap = self.log2((t - dlastskill[s]) / 1000 / 60) + 1 # minutes dlastskill[s] = t repeated_gap.append(curRepeatedGap) if pret == None or t == -1: curLastGap = 0 else: curLastGap = self.log2((t - pret) / 1000 / 60) + 1 pret = t sequence_gap.append(curLastGap) dcount.setdefault(s, 0) past_counts.append(self.log2(dcount[s])) dcount[s] += 1 return repeated_gap, sequence_gap, past_counts