23.7. ユーティリティ関数とクラス¶
この節では、本書で使用するユーティリティ関数とクラスの実装を示す。
import inspect
import collections
from d2l import torch as d2l
from IPython import display
from torch import nn
import inspect
import collections
from d2l import mxnet as d2l
from IPython import display
from mxnet import autograd, gluon, np, npx
from mxnet.gluon import nn
import random
npx.set_np()
import inspect
from IPython import display
import collections
from d2l import jax as d2l
import jax
No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)
import inspect
from IPython import display
import collections
from d2l import tensorflow as d2l
import tensorflow as tf
ハイパーパラメータ。
@d2l.add_to_class(d2l.HyperParameters) #@save
def save_hyperparameters(self, ignore=[]):
"""関数の引数をクラス属性として保存する。"""
frame = inspect.currentframe().f_back
_, _, _, local_vars = inspect.getargvalues(frame)
self.hparams = {k:v for k, v in local_vars.items()
if k not in set(ignore+['self']) and not k.startswith('_')}
for k, v in self.hparams.items():
setattr(self, k, v)
進捗バー。
@d2l.add_to_class(d2l.ProgressBoard) #@save
def draw(self, x, y, label, every_n=1):
Point = collections.namedtuple('Point', ['x', 'y'])
if not hasattr(self, 'raw_points'):
self.raw_points = collections.OrderedDict()
self.data = collections.OrderedDict()
if label not in self.raw_points:
self.raw_points[label] = []
self.data[label] = []
points = self.raw_points[label]
line = self.data[label]
points.append(Point(x, y))
if len(points) != every_n:
return
mean = lambda x: sum(x) / len(x)
line.append(Point(mean([p.x for p in points]),
mean([p.y for p in points])))
points.clear()
if not self.display:
return
d2l.use_svg_display()
if self.fig is None:
self.fig = d2l.plt.figure(figsize=self.figsize)
plt_lines, labels = [], []
for (k, v), ls, color in zip(self.data.items(), self.ls, self.colors):
plt_lines.append(d2l.plt.plot([p.x for p in v], [p.y for p in v],
linestyle=ls, color=color)[0])
labels.append(k)
axes = self.axes if self.axes else d2l.plt.gca()
if self.xlim: axes.set_xlim(self.xlim)
if self.ylim: axes.set_ylim(self.ylim)
if not self.xlabel: self.xlabel = self.x
axes.set_xlabel(self.xlabel)
axes.set_ylabel(self.ylabel)
axes.set_xscale(self.xscale)
axes.set_yscale(self.yscale)
axes.legend(plt_lines, labels)
display.display(self.fig)
display.clear_output(wait=True)
FrozenLake 環境を追加
def frozen_lake(seed): #@save
# この環境については https://www.gymlibrary.dev/environments/toy_text/frozen_lake/ を参照
# env.P.items の処理方法は https://sites.google.com/view/deep-rl-bootcamp/labs を参考にしている
import gym
env = gym.make('FrozenLake-v1', is_slippery=False)
env.seed(seed)
env.action_space.np_random.seed(seed)
env.action_space.seed(seed)
env_info = {}
env_info['desc'] = env.desc # 各グリッド要素の意味を指定する2次元配列
env_info['num_states'] = env.nS # 観測/状態の数、または観測/状態次元
env_info['num_actions'] = env.nA # 行動数、または行動次元
# (遷移確率, 次状態, 報酬, 終了) タプルのインデックスを定義
env_info['trans_prob_idx'] = 0 # 遷移確率要素のインデックス
env_info['nextstate_idx'] = 1 # 次状態要素のインデックス
env_info['reward_idx'] = 2 # 報酬要素のインデックス
env_info['done_idx'] = 3 # 終了要素のインデックス
env_info['mdp'] = {}
env_info['env'] = env
for (s, others) in env.P.items():
# others(s) = {a0: [ (p(s'|s,a0), s', reward, done),...], a1:[...], ...}
for (a, pxrds) in others.items():
# pxrds is [(p1,next1,r1,d1),(p2,next2,r2,d2),..].
# e.g. [(0.3, 0, 0, False), (0.3, 0, 0, False), (0.3, 4, 1, False)]
env_info['mdp'][(s,a)] = pxrds
return env_info
def load_array(data_arrays, batch_size, is_train=True): #@save
"""Gluon のデータイテレータを構築する。"""
dataset = gluon.data.ArrayDataset(*data_arrays)
return gluon.data.DataLoader(dataset, batch_size, shuffle=is_train)
def synthetic_data(w, b, num_examples): #@save
"""y = Xw + b + ノイズ を生成する。"""
X = d2l.normal(0, 1, (num_examples, len(w)))
y = d2l.matmul(X, w) + b
y += d2l.normal(0, 0.01, y.shape)
return X, d2l.reshape(y, (-1, 1))
def sgd(params, lr, batch_size): #@save
"""ミニバッチ確率的勾配降下法。"""
for param in params:
param[:] = param - lr * param.grad / batch_size
def get_dataloader_workers(): #@save
"""Windows を除き、データ読み込みに4プロセスを使う。"""
return 0 if sys.platform.startswith('win') else 4
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Fashion-MNIST データセットをダウンロードしてメモリに読み込む。"""
dataset = gluon.data.vision
trans = [dataset.transforms.ToTensor()]
if resize:
trans.insert(0, dataset.transforms.Resize(resize))
trans = dataset.transforms.Compose(trans)
mnist_train = dataset.FashionMNIST(train=True).transform_first(trans)
mnist_test = dataset.FashionMNIST(train=False).transform_first(trans)
return (gluon.data.DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=get_dataloader_workers()),
gluon.data.DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=get_dataloader_workers()))
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
"""GPU を使ってデータセット上のモデルの精度を計算する。"""
if not device: # 最初のパラメータが置かれている最初のデバイスを問い合わせる
device = list(net.collect_params().values())[0].list_ctx()[0]
# 正解予測数、予測数
metric = d2l.Accumulator(2)
for X, y in data_iter:
X, y = X.as_in_ctx(device), y.as_in_ctx(device)
metric.add(d2l.accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
"""第6章で定義したGPUを使ったモデルの学習。"""
net.initialize(force_reinit=True, ctx=device, init=init.Xavier())
loss = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(),
'sgd', {'learning_rate': lr})
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
timer, num_batches = d2l.Timer(), len(train_iter)
for epoch in range(num_epochs):
# 学習損失の合計、学習精度の合計、サンプル数
metric = d2l.Accumulator(3)
for i, (X, y) in enumerate(train_iter):
timer.start()
# ここが `d2l.train_epoch_ch3` との主な違い
X, y = X.as_in_ctx(device), y.as_in_ctx(device)
with autograd.record():
y_hat = net(X)
l = loss(y_hat, y)
l.backward()
trainer.step(X.shape[0])
metric.add(l.sum(), d2l.accuracy(y_hat, y), X.shape[0])
timer.stop()
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(device)}')
def grad_clipping(net, theta): #@save
"""勾配をクリップする。"""
if isinstance(net, gluon.Block):
params = [p.data() for p in net.collect_params().values()]
else:
params = net.params
norm = math.sqrt(sum((p.grad ** 2).sum() for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""画像のリストを描画する。"""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = d2l.numpy(img)
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
def load_array(data_arrays, batch_size, is_train=True): #@save
"""TensorFlow のデータイテレータを構築する。"""
dataset = tf.data.Dataset.from_tensor_slices(data_arrays)
if is_train:
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size)
return dataset
def synthetic_data(w, b, num_examples): #@save
"""y = Xw + b + ノイズ を生成する。"""
X = tf.zeros((num_examples, w.shape[0]))
X += tf.random.normal(shape=X.shape)
y = tf.matmul(X, tf.reshape(w, (-1, 1))) + b
y += tf.random.normal(shape=y.shape, stddev=0.01)
y = tf.reshape(y, (-1, 1))
return X, y
def sgd(params, grads, lr, batch_size): #@save
"""ミニバッチ確率的勾配降下法。"""
for param, grad in zip(params, grads):
param.assign_sub(lr * grad / batch_size)
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Fashion-MNIST データセットをダウンロードしてメモリに読み込む。"""
mnist_train, mnist_test = tf.keras.datasets.fashion_mnist.load_data()
# すべての数値を255で割って画素値を0から1の間にし、最後にバッチ次元を追加する。
# さらにラベルを int32 にキャストする
process = lambda X, y: (tf.expand_dims(X, axis=3) / 255,
tf.cast(y, dtype='int32'))
resize_fn = lambda X, y: (
tf.image.resize_with_pad(X, resize, resize) if resize else X, y)
return (
tf.data.Dataset.from_tensor_slices(process(*mnist_train)).batch(
batch_size).shuffle(len(mnist_train[0])).map(resize_fn),
tf.data.Dataset.from_tensor_slices(process(*mnist_test)).batch(
batch_size).map(resize_fn))
class TrainCallback(tf.keras.callbacks.Callback): #@save
"""学習の進捗を可視化するためのコールバック。"""
def __init__(self, net, train_iter, test_iter, num_epochs, device_name):
self.timer = d2l.Timer()
self.animator = d2l.Animator(
xlabel='epoch', xlim=[1, num_epochs], legend=[
'train loss', 'train acc', 'test acc'])
self.net = net
self.train_iter = train_iter
self.test_iter = test_iter
self.num_epochs = num_epochs
self.device_name = device_name
def on_epoch_begin(self, epoch, logs=None):
self.timer.start()
def on_epoch_end(self, epoch, logs):
self.timer.stop()
test_acc = self.net.evaluate(
self.test_iter, verbose=0, return_dict=True)['accuracy']
metrics = (logs['loss'], logs['accuracy'], test_acc)
self.animator.add(epoch + 1, metrics)
if epoch == self.num_epochs - 1:
batch_size = next(iter(self.train_iter))[0].shape[0]
num_examples = batch_size * tf.data.experimental.cardinality(
self.train_iter).numpy()
print(f'loss {metrics[0]:.3f}, train acc {metrics[1]:.3f}, '
f'test acc {metrics[2]:.3f}')
print(f'{num_examples / self.timer.avg():.1f} examples/sec on '
f'{str(self.device_name)}')
#@save
def train_ch6(net_fn, train_iter, test_iter, num_epochs, lr, device):
"""第6章で定義したGPUを使ったモデルの学習。"""
device_name = device._device_name
strategy = tf.distribute.OneDeviceStrategy(device_name)
with strategy.scope():
optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
net = net_fn()
net.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
callback = TrainCallback(net, train_iter, test_iter, num_epochs,
device_name)
net.fit(train_iter, epochs=num_epochs, verbose=0, callbacks=[callback])
return net
環境を作成
def make_env(name ='', seed=0): #@save
# 入力パラメータ:
# name: gym 環境を指定する。
# Value iteration では FrozenLake-v1 のみサポートする。
if name == 'FrozenLake-v1':
return frozen_lake(seed)
else:
raise ValueError("%s env is not supported in this Notebook")
def evaluate_accuracy(net, data_iter): #@save
"""データセット上のモデルの精度を計算する。"""
metric = Accumulator(2) # 正解予測数、予測数
for X, y in data_iter:
metric.add(accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
import os
import requests
import zipfile
import tarfile
import hashlib
def download(url, folder='../data', sha1_hash=None): #@save
"""ファイルを folder にダウンロードし、ローカルのファイルパスを返す。"""
if not url.startswith('http'):
# 後方互換性のため
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# キャッシュにヒットするか確認する
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# ダウンロード
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""zip/tar ファイルを folder に展開する。"""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
def evaluate_accuracy(net, data_iter): #@save
"""データセット上のモデルの精度を計算する。"""
metric = Accumulator(2) # 正解予測数、予測数
for X, y in data_iter:
metric.add(accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
価値関数を表示
def show_value_function_progress(env_desc, V, pi): #@save
# この関数は、価値と方策が時間とともにどのように変化するかを可視化する。
# V: [num_iters, num_states]
# pi: [num_iters, num_states]
# 価値関数の可視化方法は次を参考にしている(ただし変更あり):
# https://sites.google.com/view/deep-rl-bootcamp/labs
num_iters = V.shape[0]
fig, ax = plt.subplots(figsize=(15, 15))
for k in range(V.shape[0]):
plt.subplot(4, 4, k + 1)
plt.imshow(V[k].reshape(4,4), cmap="bone")
ax = plt.gca()
ax.set_xticks(np.arange(0, 5)-.5, minor=True)
ax.set_yticks(np.arange(0, 5)-.5, minor=True)
ax.grid(which="minor", color="w", linestyle='-', linewidth=3)
ax.tick_params(which="minor", bottom=False, left=False)
ax.set_xticks([])
ax.set_yticks([])
# LEFT action: 0, DOWN action: 1
# RIGHT action: 2, UP action: 3
action2dxdy = {0:(-.25, 0),1: (0, .25),
2:(0.25, 0),3: (-.25, 0)}
for y in range(4):
for x in range(4):
action = pi[k].reshape(4,4)[y, x]
dx, dy = action2dxdy[action]
if env_desc[y,x].decode() == 'H':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="y",
size=20, fontweight='bold')
elif env_desc[y,x].decode() == 'G':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="w",
size=20, fontweight='bold')
else:
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="g",
size=15, fontweight='bold')
# G と H のラベルがあるセルには矢印を描かない
if env_desc[y,x].decode() != 'G' and env_desc[y,x].decode() != 'H':
ax.arrow(x, y, dx, dy, color='r', head_width=0.2, head_length=0.15)
ax.set_title("Step = " + str(k + 1), fontsize=20)
fig.tight_layout()
plt.show()
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""画像のリストを描画する。"""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = d2l.numpy(img)
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""画像のリストを描画する。"""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = d2l.numpy(img)
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
Q関数を表示
def show_Q_function_progress(env_desc, V_all, pi_all): #@save
# この関数は、価値と方策が時間とともにどのように変化するかを可視化する。
# V: [num_iters, num_states]
# pi: [num_iters, num_states]
# 表示する値は少数にする
num_iters_all = V_all.shape[0]
num_iters = num_iters_all // 10
vis_indx = np.arange(0, num_iters_all, num_iters).tolist()
vis_indx.append(num_iters_all - 1)
V = np.zeros((len(vis_indx), V_all.shape[1]))
pi = np.zeros((len(vis_indx), V_all.shape[1]))
for c, i in enumerate(vis_indx):
V[c] = V_all[i]
pi[c] = pi_all[i]
num_iters = V.shape[0]
fig, ax = plt.subplots(figsize=(15, 15))
for k in range(V.shape[0]):
plt.subplot(4, 4, k + 1)
plt.imshow(V[k].reshape(4,4), cmap="bone")
ax = plt.gca()
ax.set_xticks(np.arange(0, 5)-.5, minor=True)
ax.set_yticks(np.arange(0, 5)-.5, minor=True)
ax.grid(which="minor", color="w", linestyle='-', linewidth=3)
ax.tick_params(which="minor", bottom=False, left=False)
ax.set_xticks([])
ax.set_yticks([])
# LEFT action: 0, DOWN action: 1
# RIGHT action: 2, UP action: 3
action2dxdy = {0:(-.25, 0),1:(0, .25),
2:(0.25, 0),3:(-.25, 0)}
for y in range(4):
for x in range(4):
action = pi[k].reshape(4,4)[y, x]
dx, dy = action2dxdy[action]
if env_desc[y,x].decode() == 'H':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="y",
size=20, fontweight='bold')
elif env_desc[y,x].decode() == 'G':
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="w",
size=20, fontweight='bold')
else:
ax.text(x, y, str(env_desc[y,x].decode()),
ha="center", va="center", color="g",
size=15, fontweight='bold')
# G と H のラベルがあるセルには矢印を描かない
if env_desc[y,x].decode() != 'G' and env_desc[y,x].decode() != 'H':
ax.arrow(x, y, dx, dy, color='r', head_width=0.2, head_length=0.15)
ax.set_title("Step = " + str(vis_indx[k] + 1), fontsize=20)
fig.tight_layout()
plt.show()
def linreg(X, w, b): #@save
"""線形回帰モデル。"""
return d2l.matmul(X, w) + b
def squared_loss(y_hat, y): #@save
"""二乗損失。"""
return (y_hat - d2l.reshape(y, y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Fashion-MNIST データセットのテキストラベルを返す。"""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
class Animator: #@save
"""アニメーションでデータを描画する。"""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# 複数の線を段階的に描画する
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# 引数を捕捉するためにラムダ関数を使う
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# 複数のデータ点を図に追加する
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
class Accumulator: #@save
"""`n` 個の変数の和を蓄積する。"""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def accuracy(y_hat, y): #@save
"""正解予測数を計算する。"""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = d2l.argmax(y_hat, axis=1)
cmp = d2l.astype(y_hat, y.dtype) == y
return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
def linreg(X, w, b): #@save
"""線形回帰モデル。"""
return d2l.matmul(X, w) + b
def squared_loss(y_hat, y): #@save
"""二乗損失。"""
return (y_hat - d2l.reshape(y, y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Fashion-MNIST データセットのテキストラベルを返す。"""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
class Animator: #@save
"""アニメーションでデータを描画する。"""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# 複数の線を段階的に描画する
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# 引数を捕捉するためにラムダ関数を使う
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# 複数のデータ点を図に追加する
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
class Accumulator: #@save
"""`n` 個の変数の和を蓄積する。"""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def accuracy(y_hat, y): #@save
"""正解予測数を計算する。"""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = d2l.argmax(y_hat, axis=1)
cmp = d2l.astype(y_hat, y.dtype) == y
return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
トレーナー
非推奨になる関数群:
def load_array(data_arrays, batch_size, is_train=True): #@save
"""PyTorch のデータイテレータを構築する。"""
dataset = torch.utils.data.TensorDataset(*data_arrays)
return torch.utils.data.DataLoader(dataset, batch_size, shuffle=is_train)
def synthetic_data(w, b, num_examples): #@save
"""y = Xw + b + ノイズ を生成する。"""
X = d2l.normal(0, 1, (num_examples, len(w)))
y = d2l.matmul(X, w) + b
y += d2l.normal(0, 0.01, y.shape)
return X, d2l.reshape(y, (-1, 1))
def sgd(params, lr, batch_size): #@save
"""ミニバッチ確率的勾配降下法。"""
with torch.no_grad():
for param in params:
param -= lr * param.grad / batch_size
param.grad.zero_()
def get_dataloader_workers(): #@save
"""データ読み込みに4プロセスを使う。"""
return 4
def load_data_fashion_mnist(batch_size, resize=None): #@save
"""Fashion-MNIST データセットをダウンロードしてメモリに読み込む。"""
trans = [transforms.ToTensor()]
if resize:
trans.insert(0, transforms.Resize(resize))
trans = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(
root="../data", train=True, transform=trans, download=True)
mnist_test = torchvision.datasets.FashionMNIST(
root="../data", train=False, transform=trans, download=True)
return (torch.utils.data.DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=get_dataloader_workers()),
torch.utils.data.DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=get_dataloader_workers()))
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
"""GPU を使ってデータセット上のモデルの精度を計算する。"""
if isinstance(net, nn.Module):
net.eval() # モデルを評価モードに設定する
if not device:
device = next(iter(net.parameters())).device
# 正解予測数、予測数
metric = d2l.Accumulator(2)
with torch.no_grad():
for X, y in data_iter:
if isinstance(X, list):
# BERT のファインチューニングで必要(後で扱う)
X = [x.to(device) for x in X]
else:
X = X.to(device)
y = y.to(device)
metric.add(d2l.accuracy(net(X), y), d2l.size(y))
return metric[0] / metric[1]
#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
"""第6章で定義したGPUを使ったモデルの学習。"""
def init_weights(m):
if type(m) == nn.Linear or type(m) == nn.Conv2d:
nn.init.xavier_uniform_(m.weight)
net.apply(init_weights)
print('training on', device)
net.to(device)
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
timer, num_batches = d2l.Timer(), len(train_iter)
for epoch in range(num_epochs):
# 学習損失の合計、学習精度の合計、サンプル数
metric = d2l.Accumulator(3)
net.train()
for i, (X, y) in enumerate(train_iter):
timer.start()
optimizer.zero_grad()
X, y = X.to(device), y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
l.backward()
optimizer.step()
with torch.no_grad():
metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
timer.stop()
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(device)}')
import os
import requests
import zipfile
import tarfile
import hashlib
def download(url, folder='../data', sha1_hash=None): #@save
"""ファイルを folder にダウンロードし、ローカルのファイルパスを返す。"""
if not url.startswith('http'):
# 後方互換性のため
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# キャッシュにヒットするか確認する
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# ダウンロード
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""zip/tar ファイルを folder に展開する。"""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
import os
import requests
import zipfile
import tarfile
import hashlib
def download(url, folder='../data', sha1_hash=None): #@save
"""ファイルを folder にダウンロードし、ローカルのファイルパスを返す。"""
if not url.startswith('http'):
# 後方互換性のため
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# キャッシュにヒットするか確認する
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# ダウンロード
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""zip/tar ファイルを folder に展開する。"""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5): #@save
"""画像のリストを描画する。"""
figsize = (num_cols * scale, num_rows * scale)
_, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
try:
img = d2l.numpy(img)
except:
pass
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
def download_extract(name, folder=None): #@save
"""zip/tar ファイルをダウンロードして展開する。"""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""テキスト行を単語または文字のトークンに分割する。"""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
def download_extract(name, folder=None): #@save
"""zip/tar ファイルをダウンロードして展開する。"""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""テキスト行を単語または文字のトークンに分割する。"""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
def linreg(X, w, b): #@save
"""線形回帰モデル。"""
return d2l.matmul(X, w) + b
def squared_loss(y_hat, y): #@save
"""二乗損失。"""
return (y_hat - d2l.reshape(y, y_hat.shape)) ** 2 / 2
def get_fashion_mnist_labels(labels): #@save
"""Fashion-MNIST データセットのテキストラベルを返す。"""
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
return [text_labels[int(i)] for i in labels]
class Animator: #@save
"""アニメーションでデータを描画する。"""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
# 複数の線を段階的に描画する
if legend is None:
legend = []
d2l.use_svg_display()
self.fig, self.axes = d2l.plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# 引数を捕捉するためにラムダ関数を使う
self.config_axes = lambda: d2l.set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
# 複数のデータ点を図に追加する
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
class Accumulator: #@save
"""`n` 個の変数の和を蓄積する。"""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
def accuracy(y_hat, y): #@save
"""正解予測数を計算する。"""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = d2l.argmax(y_hat, axis=1)
cmp = d2l.astype(y_hat, y.dtype) == y
return float(d2l.reduce_sum(d2l.astype(cmp, y.dtype)))
def evaluate_loss(net, data_iter, loss): #@save
"""与えられたデータセット上でモデルの損失を評価する。"""
metric = d2l.Accumulator(2) # 損失の合計、サンプル数
for X, y in data_iter:
l = loss(net(X), y)
metric.add(d2l.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
def evaluate_loss(net, data_iter, loss): #@save
"""与えられたデータセット上でモデルの損失を評価する。"""
metric = d2l.Accumulator(2) # 損失の合計、サンプル数
for X, y in data_iter:
l = loss(net(X), y)
metric.add(d2l.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
import os
import requests
import zipfile
import tarfile
import hashlib
def download(url, folder='../data', sha1_hash=None): #@save
"""ファイルを folder にダウンロードし、ローカルのファイルパスを返す。"""
if not url.startswith('http'):
# 後方互換性のため
url, sha1_hash = DATA_HUB[url]
os.makedirs(folder, exist_ok=True)
fname = os.path.join(folder, url.split('/')[-1])
# キャッシュにヒットするか確認する
if os.path.exists(fname) and sha1_hash:
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname
# ダウンロード
print(f'Downloading {fname} from {url}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def extract(filename, folder=None): #@save
"""zip/tar ファイルを folder に展開する。"""
base_dir = os.path.dirname(filename)
_, ext = os.path.splitext(filename)
assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
if ext == '.zip':
fp = zipfile.ZipFile(filename, 'r')
else:
fp = tarfile.open(filename, 'r')
if folder is None:
folder = base_dir
fp.extractall(folder)
#@save
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
#@save
def read_data_nmt():
"""英仏データセットを読み込む。"""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f:
return f.read()
#@save
def preprocess_nmt(text):
"""英仏データセットを前処理する。"""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '
# 改行なしスペースをスペースに置換し、大文字を小文字に変換する
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# 単語と句読点の間にスペースを挿入する
out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
#@save
def tokenize_nmt(text, num_examples=None):
"""英仏データセットをトークン化する。"""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
#@save
def truncate_pad(line, num_steps, padding_token):
"""系列を切り詰めるかパディングする。"""
if len(line) > num_steps:
return line[:num_steps] # 切り詰め
return line + [padding_token] * (num_steps - len(line)) # パディング
#@save
def build_array_nmt(lines, vocab, num_steps):
"""機械翻訳のテキスト系列をミニバッチに変換する。"""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = d2l.tensor([truncate_pad(
l, num_steps, vocab['<pad>']) for l in lines])
valid_len = d2l.reduce_sum(
d2l.astype(array != vocab['<pad>'], d2l.int32), 1)
return array, valid_len
#@save
def load_data_nmt(batch_size, num_steps, num_examples=600):
"""翻訳データセットのイテレータと語彙を返す。"""
text = preprocess_nmt(read_data_nmt())
source, target = tokenize_nmt(text, num_examples)
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, tgt_vocab
def grad_clipping(grads, theta): #@save
"""勾配をクリップする。"""
theta = tf.constant(theta, dtype=tf.float32)
new_grad = []
for grad in grads:
if isinstance(grad, tf.IndexedSlices):
new_grad.append(tf.convert_to_tensor(grad))
else:
new_grad.append(grad)
norm = tf.math.sqrt(sum((tf.reduce_sum(grad ** 2)).numpy()
for grad in new_grad))
norm = tf.cast(norm, tf.float32)
if tf.greater(norm, theta):
for i, grad in enumerate(new_grad):
new_grad[i] = grad * theta / norm
else:
new_grad = new_grad
return new_grad
def download_extract(name, folder=None): #@save
"""zip/tar ファイルをダウンロードして展開する。"""
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False, 'Only zip/tar files can be extracted.'
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def tokenize(lines, token='word'): #@save
"""テキスト行を単語または文字のトークンに分割する。"""
assert token in ('word', 'char'), 'Unknown token type: ' + token
return [line.split() if token == 'word' else list(line) for line in lines]
#@save
class MaskedSoftmaxCELoss(gluon.loss.SoftmaxCELoss):
"""マスク付きの softmax クロスエントロピー損失。"""
# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)
# `label` shape: (`batch_size`, `num_steps`)
# `valid_len` shape: (`batch_size`,)
def forward(self, pred, label, valid_len):
# `weights` shape: (`batch_size`, `num_steps`, 1)
weights = np.expand_dims(np.ones_like(label), axis=-1)
weights = npx.sequence_mask(weights, valid_len, True, axis=1)
return super(MaskedSoftmaxCELoss, self).forward(pred, label, weights)
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""系列から系列へのモデルを学習する。"""
net.initialize(init.Xavier(), force_reinit=True, ctx=device)
trainer = gluon.Trainer(net.collect_params(), 'adam',
{'learning_rate': lr})
loss = MaskedSoftmaxCELoss()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # 学習損失の合計、トークン数
for batch in data_iter:
X, X_valid_len, Y, Y_valid_len = [
x.as_in_ctx(device) for x in batch]
bos = np.array(
[tgt_vocab['<bos>']] * Y.shape[0], ctx=device).reshape(-1, 1)
dec_input = d2l.concat([bos, Y[:, :-1]], 1) # Teacher forcing
with autograd.record():
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.backward()
d2l.grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
trainer.step(num_tokens)
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""系列から系列への予測を行う。"""
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = np.array([len(src_tokens)], ctx=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# バッチ次元を追加する
enc_X = np.expand_dims(np.array(src_tokens, ctx=device), axis=0)
enc_outputs = net.encoder(enc_X, enc_valid_len)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# バッチ次元を追加する
dec_X = np.expand_dims(np.array([tgt_vocab['<bos>']], ctx=device), axis=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state)
# 次時刻のデコーダ入力として、予測確率が最も高いトークンを使う
dec_X = Y.argmax(axis=2)
pred = dec_X.squeeze(axis=0).astype('int32').item()
# 注意重みを保存する(後で扱う)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 終了トークンが予測されたら、出力系列の生成は完了
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
#@save
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
#@save
def read_data_nmt():
"""英仏データセットを読み込む。"""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f:
return f.read()
#@save
def preprocess_nmt(text):
"""英仏データセットを前処理する。"""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '
# 改行なしスペースをスペースに置換し、大文字を小文字に変換する
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# 単語と句読点の間にスペースを挿入する
out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
#@save
def tokenize_nmt(text, num_examples=None):
"""英仏データセットをトークン化する。"""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
#@save
def truncate_pad(line, num_steps, padding_token):
"""系列を切り詰めるかパディングする。"""
if len(line) > num_steps:
return line[:num_steps] # 切り詰め
return line + [padding_token] * (num_steps - len(line)) # パディング
#@save
def build_array_nmt(lines, vocab, num_steps):
"""機械翻訳のテキスト系列をミニバッチに変換する。"""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = d2l.tensor([truncate_pad(
l, num_steps, vocab['<pad>']) for l in lines])
valid_len = d2l.reduce_sum(
d2l.astype(array != vocab['<pad>'], d2l.int32), 1)
return array, valid_len
#@save
def load_data_nmt(batch_size, num_steps, num_examples=600):
"""翻訳データセットのイテレータと語彙を返す。"""
text = preprocess_nmt(read_data_nmt())
source, target = tokenize_nmt(text, num_examples)
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, tgt_vocab
def evaluate_loss(net, data_iter, loss): #@save
"""与えられたデータセット上でモデルの損失を評価する。"""
metric = d2l.Accumulator(2) # 損失の合計、サンプル数
for X, y in data_iter:
out = net(X)
y = d2l.reshape(y, out.shape)
l = loss(out, y)
metric.add(d2l.reduce_sum(l), d2l.size(l))
return metric[0] / metric[1]
#@save
def sequence_mask(X, valid_len, value=0):
"""系列中の不要な要素をマスクする。"""
maxlen = X.shape[1]
mask = tf.range(start=0, limit=maxlen, dtype=tf.float32)[
None, :] < tf.cast(valid_len[:, None], dtype=tf.float32)
if len(X.shape) == 3:
return tf.where(tf.expand_dims(mask, axis=-1), X, value)
else:
return tf.where(mask, X, value)
#@save
class MaskedSoftmaxCELoss(tf.keras.losses.Loss):
"""マスク付きの softmax クロスエントロピー損失。"""
def __init__(self, valid_len):
super().__init__(reduction='none')
self.valid_len = valid_len
# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)
# `label` shape: (`batch_size`, `num_steps`)
# `valid_len` shape: (`batch_size`,)
def call(self, label, pred):
weights = tf.ones_like(label, dtype=tf.float32)
weights = sequence_mask(weights, self.valid_len)
label_one_hot = tf.one_hot(label, depth=pred.shape[-1])
unweighted_loss = tf.keras.losses.CategoricalCrossentropy(
from_logits=True, reduction='none')(label_one_hot, pred)
weighted_loss = tf.reduce_mean((unweighted_loss*weights), axis=1)
return weighted_loss
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""系列から系列へのモデルを学習する。"""
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
animator = d2l.Animator(xlabel="epoch", ylabel="loss",
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # 学習損失の合計、トークン数
for batch in data_iter:
X, X_valid_len, Y, Y_valid_len = [x for x in batch]
bos = tf.reshape(tf.constant([tgt_vocab['<bos>']] * Y.shape[0]),
shape=(-1, 1))
dec_input = tf.concat([bos, Y[:, :-1]], 1) # Teacher forcing
with tf.GradientTape() as tape:
Y_hat, _ = net(X, dec_input, X_valid_len, training=True)
l = MaskedSoftmaxCELoss(Y_valid_len)(Y, Y_hat)
gradients = tape.gradient(l, net.trainable_variables)
gradients = d2l.grad_clipping(gradients, 1)
optimizer.apply_gradients(zip(gradients, net.trainable_variables))
num_tokens = tf.reduce_sum(Y_valid_len).numpy()
metric.add(tf.reduce_sum(l), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device._device_name)}')
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
save_attention_weights=False):
"""系列から系列への予測を行う。"""
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = tf.constant([len(src_tokens)])
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# バッチ次元を追加する
enc_X = tf.expand_dims(src_tokens, axis=0)
enc_outputs = net.encoder(enc_X, enc_valid_len, training=False)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# バッチ次元を追加する
dec_X = tf.expand_dims(tf.constant([tgt_vocab['<bos>']]), axis=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state, training=False)
# 次時刻のデコーダ入力として、予測確率が最も高いトークンを使う
dec_X = tf.argmax(Y, axis=2)
pred = tf.squeeze(dec_X, axis=0)
# 注意重みを保存する
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 終了トークンが予測されたら、出力系列の生成は完了
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred.numpy())
return ' '.join(tgt_vocab.to_tokens(tf.reshape(output_seq, shape = -1).numpy().tolist())), attention_weight_seq
def grad_clipping(net, theta): #@save
"""勾配をクリップする。"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
注意機構の章に関する追加。
#@save
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
#@save
def read_data_nmt():
"""英仏データセットを読み込む。"""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f:
return f.read()
#@save
def preprocess_nmt(text):
"""英仏データセットを前処理する。"""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '
# 改行なしスペースをスペースに置換し、大文字を小文字に変換する
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# 単語と句読点の間にスペースを挿入する
out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else char
for i, char in enumerate(text)]
return ''.join(out)
#@save
def tokenize_nmt(text, num_examples=None):
"""英仏データセットをトークン化する。"""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target
#@save
def truncate_pad(line, num_steps, padding_token):
"""系列を切り詰めるかパディングする。"""
if len(line) > num_steps:
return line[:num_steps] # 切り詰め
return line + [padding_token] * (num_steps - len(line)) # パディング
#@save
def build_array_nmt(lines, vocab, num_steps):
"""機械翻訳のテキスト系列をミニバッチに変換する。"""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = d2l.tensor([truncate_pad(
l, num_steps, vocab['<pad>']) for l in lines])
valid_len = d2l.reduce_sum(
d2l.astype(array != vocab['<pad>'], d2l.int32), 1)
return array, valid_len
#@save
def load_data_nmt(batch_size, num_steps, num_examples=600):
"""翻訳データセットのイテレータと語彙を返す。"""
text = preprocess_nmt(read_data_nmt())
source, target = tokenize_nmt(text, num_examples)
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, tgt_vocab
#@save
def sequence_mask(X, valid_len, value=0):
"""系列中の不要な要素をマスクする。"""
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
X[~mask] = value
return X
#@save
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""マスク付きの softmax クロスエントロピー損失。"""
# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)
# `label` shape: (`batch_size`, `num_steps`)
# `valid_len` shape: (`batch_size`,)
def forward(self, pred, label, valid_len):
weights = torch.ones_like(label)
weights = sequence_mask(weights, valid_len)
self.reduction='none'
unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(
pred.permute(0, 2, 1), label)
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""系列から系列へのモデルを学習する。"""
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # 学習損失の合計、トークン数
for batch in data_iter:
optimizer.zero_grad()
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
dec_input = d2l.concat([bos, Y[:, :-1]], 1) # Teacher forcing
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward() # `backward` のために損失をスカラーにする
d2l.grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""系列から系列への予測を行う。"""
# 推論のために `net` を評価モードに設定する
net.eval()
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# バッチ次元を追加する
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
enc_outputs = net.encoder(enc_X, enc_valid_len)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# バッチ次元を追加する
dec_X = torch.unsqueeze(torch.tensor(
[tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state)
# 次時刻のデコーダ入力として、予測確率が最も高いトークンを使う
dec_X = Y.argmax(dim=2)
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
# 注意重みを保存する(後で扱う)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 終了トークンが予測されたら、出力系列の生成は完了
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq