import os
import sys
import torch
import torch.nn as nn
import math
from torch.autograd import Variable
import spacy
import random
import numpy as np
import torch.nn.functional as F
import copy
d_model = 4
opt = {
"vec_dim":512,
"heads":8,
"N":6,
"x_vocab_len":0,
"y_vocab_len":0,
"sentence_len":80,
"batchs":1000,
"batch_size":10,
"pad":0,
"sof":1,
"eof":2,
}
class Embedder(nn.Module):
def __init__(self, vocab_size, d_model, grad=True):
super().__init__()
self.grad = grad
self.embed = nn.Embedding(vocab_size, d_model)
"""
if os.path.exists("./model/embed_weight.w"):
print(" embed_weight exists , loadding it ...\n")
embed_weight = torch.load("./model/embed_weight.w")
self.embed.weight = nn.Parameter(embed_weight)
"""
self.embed.weight.requires_grad = grad
def forward(self, x):
"""
if self.grad:
torch.save(self.embed.weight, "./model/embed_weight.w")
"""
return self.embed(x)
class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_seq_len = 80, dropout = 0.1):
super().__init__()
self.d_model = d_model
self.dropout = nn.Dropout(dropout)
# create constant 'pe' matrix with values dependant on
# pos and i
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model, 2):
pe[pos, i] = \
math.sin(pos / (10000 ** ((2 * i)/d_model)))
pe[pos, i + 1] = \
math.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
# make embeddings relatively larger
x = x * math.sqrt(self.d_model)
#add constant to embedding
seq_len = x.size(1)
pe = Variable(self.pe[:,:], requires_grad=False)
if x.is_cuda:
pe.cuda()
x = x + pe
return self.dropout(x)
class dataset():
def __init__(self, src_path, dst_path, src_lang, dst_lang, train=True, batchs = 10, batch_size = 10, train_rate=0.8, order=False, sentence_len = 80):
#get parameters from init
self.batchs = batchs
self.batch_size = batch_size
self.train_rate = train_rate
self.order = order
self.train = train
self.sentence_len = sentence_len
self.src_weight_path = './model/weight/src.w'
self.dst_weight_path = './model/weight/dst.w'
# get data from file
self.src_path = src_path
self.dst_path = dst_path
self.src_data = open(self.src_path).read().strip().split('\n')
self.dst_data = open(self.dst_path).read().strip().split('\n')
# get vector embedding of sentence
self.src_word = []
self.dst_word = []
self.src_vec = []
self.dst_vec = []
self.src_vocab = []
self.dst_vocab = []
self.src_index = []
self.dst_index = []
if os.path.exists( "./model/src_vec.w") and os.path.exists("./model/dst_vec.w"):
print(" src_vec.w existed, load it \n")
self.load_vec()
else:
print(" src_vec.w not exists, create a new one \n")
self.init_vec()
self.save_vec()
# get dataset len
self.dataset_len = len( self.src_vec )
self.src_vocab_len = len( self.src_vocab )
self.dst_vocab_len = len( self.dst_vocab )
# sorted
self.ids = [ ids for ids in range(self.dataset_len) ]
if not order:
random.shuffle(self.ids)
# get training and testing set split by train_rate
self.train_start = 0
self.train_stop = int( np.floor( self.dataset_len * self.train_rate - 1 ) )
self.test_start = self.train_stop + 1
self.test_stop = self.dataset_len - 1
if train:
self.line_start = self.train_start
self.line_stop = self.train_stop
else:
self.line_start = self.test_start
self.line_stop = self.test_stop
# set idx
self.batchs_cnt = 0
self.line_idx = 0
#print(self.dst_index)
#print("**********\n")
def init_vec(self):
# get embedding of sentence
nlp = spacy.load("fr_core_news_md")
for token1 in self.src_data:
#print(token1)
nlp_ = nlp(token1)
words = []
vecs = []
for token2 in nlp_:
vecs.append( token2.vector_norm )
words.append( token2.text )
self.src_vec.append( vecs )
self.src_word.append( words )
#print( words )
self.src_vocab = build_vocab( self.src_word, self.src_vec )
for token1 in self.src_word:
indexs = []
token1 = ['<sof>'] + token1 + ['<eof>']
for token2 in token1:
indexs.append( self.src_vocab.index(token2) )
self.src_index.append(indexs)
nlp = spacy.load("en_core_web_md")
for token1 in self.dst_data:
#print(token1)
nlp_ = nlp(token1)
words = []
vecs = []
for token2 in nlp_:
vecs.append( token2.vector_norm )
words.append( token2.text )
self.dst_vec.append( vecs )
self.dst_word.append( words )
#print( words )
self.dst_vocab = build_vocab( self.dst_word, self.dst_vec )
for token1 in self.dst_word:
token1 = ['<sof>'] + token1 + ['<eof>']
indexs = []
for token2 in token1:
indexs.append( self.dst_vocab.index(token2) )
self.dst_index.append(indexs)
#print( self.src_vocab[2] )
#print( self.src_vocab.index('Au') )
def load_vec(self):
self.src_vec = torch.load( "./model/src_vec.w" )
self.dst_vec = torch.load( "./model/dst_vec.w" )
self.src_word = torch.load( "./model/src_word.w" )
self.dst_word = torch.load( "./model/dst_word.w" )
self.src_vocab = torch.load( "./model/src_vocab.w")
self.dst_vocab = torch.load( "./model/dst_vocab.w")
self.src_index = torch.load( "./model/src_index.w")
self.dst_index = torch.load( "./model/dst_index.w")
def save_vec(self):
torch.save( self.src_vec, "./model/src_vec.w")
torch.save( self.dst_vec, "./model/dst_vec.w")
torch.save( self.src_word, "./model/src_word.w")
torch.save( self.dst_word, "./model/dst_word.w")
torch.save( self.src_vocab, "./model/src_vocab.w")
torch.save( self.dst_vocab, "./model/dst_vocab.w")
torch.save( self.src_index, "./model/src_index.w")
torch.save( self.dst_index, "./model/dst_index.w")
def __iter__(self):
return self
def __next__(self):
if (self.batchs_cnt >= self.batchs) & (self.batchs > 0):
self.batchs_cnt = 0
raise StopIteration
self.batchs_cnt += 1
X = []
Y = []
for i in range( self.batch_size):
X_, Y_ = self._next()
# pad x
len_x = len(X_)
if len_x > self.sentence_len:
X_ = X_[0:self.sentence_len-1]
else:
len_x = self.sentence_len - len_x
X_ = X_ + [0] * len_x
# pad y
len_y = len(Y_)
if len_y > self.sentence_len:
Y_ = Y_[0:self.sentence_len-1]
else:
len_y = self.sentence_len - len_y
Y_ = Y_ + [0] * len_y
X.append(X_)
Y.append(Y_)
X = torch.tensor(X, dtype=torch.long)
Y = torch.tensor(Y, dtype=torch.long)
return X, Y
def _next(self):
if self.line_idx >= self.line_stop:
self.line_idx = self.line_start
self.line_idx += 1
return self.src_index[self.line_idx], self.dst_index[self.line_idx]
def src_vocab_len(self):
return self.src_vocab_len
def dst_vocab_len(self):
return self.dst_vocab_len
def build_vocab( word, vec ):
dic = {'<pad>':0,'<sof>':1,'<eof>':2}
rows = len(word)
for i in range(rows):
cols = len(word[i])
for j in range(cols):
dic[ word[i][j] ] = vec[i][j]
dic = dic.keys()
dic = list(dic)
return dic
def vocab_s2i( vocab, s):
if not s in vocab:
return 0
return vocab.index(s)
def vocab_i2s( vocab, i):
return vocab[i]
def vocab_softmax( vocab, s):
lines, slen = s.shape
vocab_len = len( vocab)
ret = torch.zeros( (lines, slen, vocab_len), dtype=torch.float )
for l in range(lines):
for c in range(slen):
ret[l, c, s[l,c]] = 1
return ret
# attention
def attention(q,k,v,dk,mask=None,dropout=None):
m = torch.matmul(q, k.transpose(-2,-1)) / math.sqrt(dk)
if mask is not None:
mask = mask.unsqueeze(1)
m = m.masked_fill(mask == 0, -1e9)
#print(m)
m = F.softmax(m, dim = -1)
if dropout is not None:
m = dropout(m)
#print(m)
m = torch.matmul( m, v)
return m
# multi head attention
class MultiHeadAttention(nn.Module):
def __init__(self, heads, d_model, dropout=0.1):
super().__init__()
self.d_model = d_model
self.dk = d_model // heads
self.h = heads
self.lq = nn.Linear(d_model, d_model)
self.lk = nn.Linear(d_model, d_model)
self.lv = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout( dropout)
self.out = nn.Linear( d_model, d_model)
def forward(self, q, k, v, mask = None):
batch_size = q.size(0)
q = self.lq(q).view(batch_size, -1, self.h, self.dk)
k = self.lk(k).view(batch_size, -1, self.h, self.dk)
v = self.lv(v).view(batch_size, -1, self.h, self.dk)
q = q.transpose(1,2)
k = k.transpose(1,2)
v = v.transpose(1,2)
atn = attention(q,k,v,self.dk, mask, self.dropout)
ret = atn.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)
ret = self.out(ret)
return ret
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff=2048, dropout = 0.1):
super().__init__()
self.linear_1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear_2 = nn.Linear(d_ff, d_model)
def forward(self, x):
x = self.dropout( F.relu( self.linear_1(x) ) )
x = self.linear_2(x)
return x
class Norm(nn.Module):
def __init__(self, d_model, eps = 1e-6):
super().__init__()
self.size = d_model
self.alpha = nn.Parameter(torch.ones(self.size))
self.bias = nn.Parameter(torch.zeros(self.size))
self.eps = eps
def forward(self, x):
norm = self.alpha * (x - x.mean(dim = -1, keepdim=True)) / ( x.std(dim=-1, keepdim=True) + self.eps ) + self.bias
return norm
class EncoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout = 0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.attn = MultiHeadAttention(heads, d_model)
self.ff = FeedForward(d_model)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
def forward(self, x, mask ):
x2 = self.norm_1(x)
x = x + self.dropout_1( self.attn(x2, x2, x2, mask) )
x2 = self.norm_2(x)
x = x + self.dropout_2( self.ff(x2) )
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout = 0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.norm_3 = Norm(d_model)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
self.dropout_3 = nn.Dropout(dropout)
self.attn_1 = MultiHeadAttention(heads, d_model)
self.attn_2 = MultiHeadAttention(heads, d_model)
#self.ff = FeedForward(d_model).cuda()
self.ff = FeedForward(d_model)
def forward(self, x, e_outputs, x_mask, y_mask):
x2 = self.norm_1(x)
x = x + self.dropout_1( self.attn_1(x2,x2,x2,y_mask) )
x2 = self.norm_2(x)
x = x + self.dropout_2( self.attn_2(x2, e_outputs, e_outputs, y_mask) )
x2 = self.norm_3(x)
x = x + self.dropout_3( self.ff(x2) )
return x
def get_clones(module, N):
return nn.ModuleList( [ copy.deepcopy(module) for i in range(N) ] )
class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, grad=True):
super().__init__()
self.N = N
self.embed = Embedder( vocab_size, d_model, grad)
self.pe = PositionalEncoder( d_model )
self.layers = get_clones( EncoderLayer(d_model, heads), N)
self.norm = Norm(d_model)
def forward(self, x, mask):
y = self.embed(x)
y = self.pe(y)
for i in range(self.N):
y = self.layers[i](y, mask)
return self.norm(y)
class Decoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, grad=True):
super().__init__()
self.N = N
self.embed = Embedder( vocab_size, d_model, grad)
self.pe = PositionalEncoder( d_model )
self.layers = get_clones( DecoderLayer(d_model, heads), N)
self.norm = Norm(d_model)
def forward(self, y, e_outputs, x_mask, y_mask):
x = self.embed(y)
x = self.pe(x)
for i in range(self.N):
x = self.layers[i](x, e_outputs, x_mask, y_mask)
return self.norm(x)
class Transformer(nn.Module):
def __init__(self, x_vocab_len, y_vocab_len, d_model, N, heads, grad=True):
super().__init__()
self.encoder = Encoder( x_vocab_len, d_model, N, heads, grad)
self.decoder = Decoder( y_vocab_len, d_model, N, heads, grad)
self.out = nn.Linear( d_model, y_vocab_len)
def forward(self, x, y, x_mask, y_mask):
e_outputs = self.encoder(x, x_mask)
#print(" e:", e_outputs.shape)
d_output = self.decoder(y, e_outputs, x_mask, y_mask)
#print(" o:", d_output.shape)
output = self.out(d_output)
return output
def get_mask(dat, x,y):
x_vocab = dat.src_vocab
y_vocab = dat.dst_vocab
#x_seq = x.transpose(0,1)
x_seq = x
x_pad = vocab_s2i( x_vocab , '<pad>' )
x_msk = ( x_seq != x_pad ).unsqueeze(1)
y_seq = y
y_pad = vocab_s2i( y_vocab, '<pad>' )
y_msk = ( y_seq != y_pad ).unsqueeze(1)
size = y_seq.size(1)
nopeak_msk = np.triu( np.ones( (1, size, size) ), k = 1).astype('uint8')
nopeak_msk = Variable( torch.from_numpy( nopeak_msk ) == 0 )
y_msk = y_msk & nopeak_msk
return x_msk, y_msk
# init model and optimions
dat = dataset("./dataset/french.txt", "./dataset/english.txt", "fr", "en", True, opt["batchs"], opt["batch_size"])
opt["x_vocab_len"] = len( dat.src_vocab )
opt["y_vocab_len"] = len( dat.dst_vocab )
print("Options:", opt)
def train():
model = Transformer( opt["x_vocab_len"], opt["y_vocab_len"], opt["vec_dim"], opt["N"], opt["heads"], grad=True )
if os.path.exists( "./model/transformer.m"):
print(" loading transformer model ... \n")
model.load_state_dict( torch.load( "./model/transformer.m") )
else:
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
optim = torch.optim.Adam( model.parameters(), lr = 0.0001, betas = (0.9, 0.98), eps = 1e-9)
#print(len(dat.src_vec))
#print(x.shape)
#print(" vocab:", dat.src_vocab_len, dat.dst_vocab_len)
#src_emb = Embedder(dat.src_vocab_len, opt["vec_dim"])
#dst_emb = Embedder(dat.dst_vocab_len, opt["vec_dim"])
#pos = PositionalEncoder(opt["vec_dim"])
x_vocab = dat.src_vocab
y_vocab = dat.dst_vocab
print( "len of x_vocab:", len(x_vocab), " len of y_vocab:", len(y_vocab) )
total_loss = 0
for i in range(opt["batchs"]):
#print("\n ********: ", i, "\n")
x,y = next(dat)
# get y 's input
y_input = y[:,:]
#print(y.shape, y_input.shape, y_output.shape)
x_msk,y_msk = get_mask(dat, x, y_input)
#print(x_msk.shape, y_msk.shape)
preds = model( x, y_input, x_msk, y_msk)
preds = F.softmax( preds, dim=-1)
y_ = vocab_softmax( y_vocab, y)
#print( "preds:", preds.shape)
#print( "y:", y.shape)
#print( "vocabSoftmax:", y_.shape )
#print( " y_:", y_)
#print( " preds:", preds)
optim.zero_grad()
#print( opt["pad"] )
#loss = F.cross_entropy( preds.view(-1, preds.size(-1)), y_.view( -1, y_.size(-1) ), ignore_index=0 )
#print( preds.shape, y_.shape, y_msk.shape)
#print( preds.view(-1, preds.size(-1)).shape, preds.size(-1), preds.shape )
#print( preds.view(-1, preds.size(-1)), y_.view(-1, y_.size(-1) ) )
loss = F.cross_entropy( preds.view(-1, preds.size(-1)), y_.view( -1, y_.size(-1) ) )
#print("loss:", loss)
loss.backward()
optim.step()
#print(loss.data)
total_loss += loss.data
print( " [%d / %d] : loss: %.3f ] \n "%( i, opt["batchs"], loss.data/opt["batch_size"] ) )
torch.save( model.state_dict(), "./model/transformer.m" )
def translate(model, sentence):
max_len = 80
x_vocab = dat.src_vocab
y_vocab = dat.dst_vocab
# get embedding of sentence
nlp = spacy.load("fr_core_news_md")
token1 = nlp(sentence)
words = []
index = []
for token2 in token1:
words.append( token2.text )
index.append( vocab_s2i( x_vocab, token2.text ) )
print("words:", words)
print("index:", index)
# pad x to max len
len_x = len(index)
if len_x > max_len:
x = index[0: max_len - 1]
else:
len_x = max_len - len_x
x = index + [0] * len_x
x = torch.tensor(x, dtype=torch.long)
#x_seq = x.transpose(0,1)
x_seq = x
x_pad = vocab_s2i( x_vocab , '<pad>' )
x_msk = (x_seq != x_pad).unsqueeze(-2)
e_outputs = model.encoder( x, x_msk )
y = torch.zeros( (1,max_len), dtype=torch.long)
y_msk = torch.zeros( (1, max_len), dtype=torch.bool )
y_eof = vocab_s2i( y_vocab, '<eof>')
#print("y_msk:", y_msk)
#print("x:", x_msk.shape, x_msk.dtype, " y:", y_msk.shape, y_msk.dtype)
outputs = ''
for i in range(max_len):
y_msk[:,i] = True
#print("yyy: [",i, "]", y )
y_ = model.out( model.decoder( y, e_outputs, x_msk, y_msk) )
y_ = F.softmax( y_, dim=-1)
val, idx = y_[:, -1].data.topk(1)
y[0,i] = idx[0][0]
if idx[0][0] == y_eof:
break
for i in range(max_len):
word = vocab_i2s( y_vocab, y[0,i] )
outputs = outputs + ' ' + word
print( outputs )
def run():
model = Transformer( opt["x_vocab_len"], opt["y_vocab_len"], opt["vec_dim"], opt["N"], opt["heads"], grad=False )
if os.path.exists( "./model/transformer.m"):
print(" loading transformer model ... \n")
model.load_state_dict( torch.load( "./model/transformer.m") )
model.eval()
translate(model, "Corse !")
translate(model, "Une empreinte carbone est la somme de pollution au dioxyde de carbone que nous produisons par nos activités. Certaines personnes essaient de réduire leur empreinte carbone parce qu'elles sont inquiètes du changement climatique.")
"""
For training run this command:
python face_cnn.py train
For testing fun this command:
python face_cnn.py test
"""
if __name__ == '__main__':
args = sys.argv[1:]
print( args, len(args))
if (len(args) == 1) & (args[0] == "train"):
train()
elif (len(args) == 1) & (args[0] == "run"):
run()
else:
test()