Class 6 DNN-HMM作业代码精读

训练流程

image-20220918013105505

image-20220918013130192

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# Author: Sining Sun, Zhanheng Yang, Binbin Zhang

import math
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import kaldi_io
from utils import *

#构造索引数据对象
targets_list = ['Z', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'O']
targets_mapping = {}
for i, x in enumerate(targets_list):
targets_mapping[x] = i

def plot_loss(avg_loss, filename):
fig = plt.figure(figsize=(20, 10))
plt.plot(avg_loss)
plt.xlabel('epochs')
plt.ylabel('loss')
plt.savefig(filename)
plt.show()

#定义一个Layer类,具有前向,后向,设置学习率,更新的方法
class Layer(object):
def forward(self, input):
''' Forward function by input
Args:
input: input, B * N matrix, B for batch size
Returns:
output when applied this layer
'''
raise 'Not implement error'

def backward(self, input, output, d_output):
''' Compute gradient of this layer's input by (input, output, d_output)
as well as compute the gradient of the parameter of this layer
Args:
input: input of this layer
output: output of this layer
d_output: accumulated gradient from final output to this
layer's output
Returns:
accumulated gradient from final output to this layer's input
'''
raise 'Not implement error'

def set_learning_rate(self, lr):
''' Set learning rate of this layer'''
self.learning_rate = lr

def update(self):
''' Update this layers parameter if it has or do nothing
'''

#激活函数RELU层
class ReLU(Layer):
def forward(self, input):
# BEGIN_LAB
tem_mat = np.maximum(0, input)
#assert(断言) # 条件为 true 正常执行;条件为 false 触发异常
assert (tem_mat.shape == input.shape)
# print('1',tem_mat.T.shape)
return tem_mat.T
# END_LAB

def backward(self, input, output, d_output):
# BEGIN_LAB
d_mat = np.array(d_output, copy=True)
# if input.any() <=0:
# d_mat= 0
d_mat[input <= 0] = 0
assert (d_mat.shape == input.shape)
# print('2',d_mat.T.shape)
return d_mat.T

# END_LAB

#全连接层定义
class FullyConnect(Layer):
def __init__(self, in_dim, out_dim):
self.w = np.random.randn(out_dim, in_dim) * np.sqrt(2.0 / in_dim)
self.b = np.zeros((out_dim, 1))
self.dw = np.zeros((out_dim, in_dim))
self.db = np.zeros((out_dim, 1))

def forward(self, input):
# BEGIN_LAB
#FNN前馈神经网络计算
out_mat = np.dot(self.w, input.T) + self.b
assert out_mat.shape == (self.w.shape[0], input.shape[0])
# print('3',out_mat.shape)
return out_mat
# END_LAB

def backward(self, input, output, d_output):
batch_size = input.shape[0]
in_diff = None
# BEGIN_LAB, compute in_diff/dw/db here
#反向传播链式求导核心公式
self.dw = np.dot(d_output, input) / batch_size
#axis=1为横向,axis=0为纵向
self.db = np.sum(d_output, axis=1, keepdims=True) / batch_size
outt_mat = np.dot(self.w.T, d_output)

assert (outt_mat.shape == input.T.shape)
assert (self.dw.shape == self.w.shape)
assert (self.db.shape == self.b.shape)
# print('4',outt_mat.T.shape)
in_diff = outt_mat.T
# END_LAB
# Normalize dw/db by batch size
self.dw = self.dw / batch_size
self.db = self.db / batch_size
return in_diff

#权重和偏移量更新
def update(self):
self.w = self.w - self.learning_rate * self.dw
self.b = self.b - self.learning_rate * self.db

#softmax层:概率归一化
class Softmax(Layer):
def forward(self, input):
_input = input.T
row_max = _input.max(axis=1).reshape(_input.shape[0], 1)
#并不是普通的softmax,而是针对防止上溢下溢而提出的 -max() 的方法
x = _input - row_max
return np.exp(x) / np.sum(np.exp(x), axis=1).reshape(x.shape[0], 1)

def backward(self, input, output, d_output):
''' Directly return the d_output as we show below, the grad is to
the activation(input) of softmax
'''
return d_output


class DNN(object):
def __init__(self, in_dim, out_dim, hidden_dim, num_hidden):
#初始化一个空layers列表
self.layers = []
#添加一个全连接层FullyConnect(in_dim, hidden_dim)
self.layers.append(FullyConnect(in_dim, hidden_dim))
#添加一个激活函数RELU层
self.layers.append(ReLU())
#for循环添加隐藏层,隐藏层由全连接层和激活函数层组成
for i in range(num_hidden):
#隐藏层里的全连接层(hidden_dim, hidden_dim)
self.layers.append(FullyConnect(hidden_dim, hidden_dim))
#每一个全连接层后面紧接一个RELU层
self.layers.append(ReLU())
#全连接层(hidden_dim, out_dim)
self.layers.append(FullyConnect(hidden_dim, out_dim))
#最后一层的softmax层概率归一化
self.layers.append(Softmax())

#设置学习率大小
def set_learning_rate(self, lr):
for layer in self.layers:
layer.set_learning_rate(lr)

def forward(self, input):
self.forward_buf = []
#此步应该是因为input和output矩阵大小相等(100,11)
out = input
self.forward_buf.append(out)
for i in range(len(self.layers)):
#FNN(Feedforward Neural Network)计算
out = self.layers[i].forward(out)
self.forward_buf.append(out)
assert (len(self.forward_buf) == len(self.layers) + 1)
return out

def backward(self, grad):
'''
Args:
grad: the grad is to the activation before softmax
'''
self.backward_buf = [None] * len(self.layers)
self.backward_buf[len(self.layers) - 1] = grad
for i in range(len(self.layers) - 2, -1, -1):
grad = self.layers[i].backward(self.forward_buf[i],
self.forward_buf[i + 1],
self.backward_buf[i + 1].T)
self.backward_buf[i] = grad


def update(self):
for layer in self.layers:
layer.update()


#one-hot编码
def one_hot(labels, total_label):
#构造(18593, 11)的矩阵:18593为训练集语音的个数,11为对应的词典的编码对应发音
output = np.zeros((labels.shape[0], total_label))
for i in range(labels.shape[0]):
#对第i个孤立词的对应编码列的唯一位置置为1
output[i][labels[i]] = 1.0
#返回的output即为训练集的编码矩阵(18593, 11)
return output


def train(dnn):
utt2feat, utt2target = read_feats_and_targets('train/feats.scp',
'train/text')
#调用utils内的函数读取提取的特征文件
inputs, labels = build_input(targets_mapping, utt2feat, utt2target)
num_samples = inputs.shape[0]
#print('input', num_samples)
# Shuffle data

#随机排列函数,就是将输入的数据进行随机排列
permute = np.random.permutation(num_samples)
inputs = inputs[permute]
labels = labels[permute]
#迭代训练次数
num_epochs = 200
#批数据大小
batch_size = 100
avg_loss = np.zeros(num_epochs)
for i in range(num_epochs):
#起始位置,从第0个开始
cur = 0
#结束条件,小于孤立词语音个数(从0开始)
while cur < num_samples:
#控制每一次处理的数据量是一个批大小但不能超过总个数
end = min(cur + batch_size, num_samples)
#input.shape(100,11)
input = inputs[cur:end]
label = labels[cur:end]
# print('input',input.shape)
# print('label',label.shape)
# Step1: forward
out = dnn.forward(input)
#out.shape()=(100,11)
#print('out',out.shape)
#进行one-hot编码
one_hot_label = one_hot(label, out.shape[1])

#计算交叉熵CE(Cross Entropy)损失函数以及反向传播
# Step2: Compute cross entropy loss and backward
# print(one_hot_label.shape)
loss = -np.sum(np.log(out + 1e-20) * one_hot_label) / out.shape[0]

# The grad is to activation before softmax
grad = out - one_hot_label
#反向传播计算
dnn.backward(grad)
# Step3: update parameters
dnn.update()
print('Epoch {} num_samples {} loss {}'.format(i, cur, loss))
avg_loss[i] += loss
cur += batch_size
avg_loss[i] /= math.ceil(num_samples / batch_size)
plot_loss(avg_loss, 'loss.png')


def test(dnn):
utt2feat, utt2target = read_feats_and_targets('test/feats.scp',
'test/text')
total = len(utt2feat)
correct = 0
for utt in utt2feat:
t = utt2target[utt]
ark = utt2feat[utt]
mat = kaldi_io.read_mat(ark)
mat = splice(mat, 5, 5)
posterior = dnn.forward(mat)
posterior = np.sum(posterior, axis=0) / float(mat.shape[0])
predict = targets_list[np.argmax(posterior)]
if t == predict: correct += 1
print('label: {} predict: {}'.format(t, predict))
print('Acc: {}'.format(float(correct) / total))


def main():
#利用随机数种子,使得每次生成的随机数相同。
np.random.seed(777)
#我们将原始特征与左5帧和右5帧拼接
# We splice the raw feat with left 5 frames and right 5 frames
# So the input here is 39 * (5 + 1 + 5) = 429
#DNN(in_dim, out_dim, hidden_dim, num_hidden)
dnn = DNN(429, 11, 128, 1)
dnn.set_learning_rate(1e-2)
train(dnn)
test(dnn)


if __name__ == '__main__':
main()

理论支撑

image-20220918012257685

参考


Class 6 DNN-HMM作业代码精读
https://blog.baixf.tk/2022/09/17/语音识别学习/Class 6 DNN-HMM作业代码精读/
作者
白小飞
发布于
2022年9月17日
许可协议