東京都府中市、渋谷区のWEB制作会社Maromaroのブログです

2023.03.06

Sasaki

[深層学習]AppleSilicon搭載MacでTensorFlow(テンソルフロー)を用いてテキスト生成してみた。

こんにちは、佐々木です。

深層学習に触れてみたいな・・・と思い、最初はOpenAIのAPIを使ってと思ったのですが
もうちょっとちゃんとやってみようと思いGoogleが開発した「TensorFlow」をちょと触ってみました。

私の環境はMacbookProのM1Max搭載のモデルになります。

↓主にM1Macで動かすための格闘戦になってしまい、肝心な中身を掘ることができず、チュートリアルを動かすことに専念してしまいました・・・。
利用したチュートリアル:https://www.tensorflow.org/tutorials/text/text_generation?hl=ja

まずはTensorFlowをインストール

とってもわかりやすく公式まとめてありました。
https://www.tensorflow.org/install?hl=ja

とはいっても、この手のサイトは色々書いてあって、目がチカチカしますよね・・・。(AWSの公式サイトとか特に)

まず、前提として「Python の pip パッケージ マネージャー」が必要となります。

まず、pipをアップグレード

pip install --upgrade pip

続いてTensorflowをインストール

pip install tensorflow

意外に簡単に入っちゃいましたかね?

そんなわけがなかった!! M1環境では「zsh: illegal hardware instruction」と出て動かないので、パッケージマネージャ「Miniforge」というものを利用します。

下記の手順でインストールする(M1 Mac)

https://github.com/conda-forge/miniforge

Mambaforge-22.11.1-4-MacOSX-x86_64.shのようなファイルをダウンロード。

bash Miniforge3-MacOSX-arm64.sh
#のような形でshを実行

conda create --name python39 python=3.9.2
conda activate python39

#以降コマンド実行の際は↑のactivateをしてから行うconda install numpyconda install tensorflow
conda install tensorflow-depspython -m pip install tensorflow-macos==2.9 python -m pip install tensorflow-metal==0.5.0

conda activate python39した状態で、python -Vとすると、「Python 3.9.2」と表示されると思います。

Tensorflowを用いてテキスト生成してみる

なにやら「RNN によるテキスト生成」と言うらしい。
ほんっと用語が追いつかんです。

参考のチュートリアルに沿って実行していきます。
https://www.tensorflow.org/tutorials/text/text_generation?hl=ja

再帰的ニューラルネットワーク(RNN)とは

 Recurrent neural networkの略だそうです。

過去の情報を利用して現在および将来の入力に対するネットワークの性能を向上させる、ディープラーニングネットワーク構造です。

ということみたいです。なんとなくわかった気がする。

ファイルを準備する

適当なフォルダを用意し、ファイルを作成します。
私は「text_gen.py」としました。

コードを記入する

text_gen.pyにコードを書き込みます。

import tensorflow as tf
import numpy as np
import os
import time
#シェークスピアのテキストデータを取得してくる
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
print(path_to_file)

# 読み込んだのち、Python 2 との互換性のためにデコード
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')

# テキストの長さは含まれる文字数
print ('Length of text: {} characters'.format(len(text)))

#テキストデータの250文字までを取得する
print(text[:250])

# ファイル中のユニークな文字の数
#set関数で重複を取り除き、ソートをかけている。
vocab = sorted(set(text))
print ('{} unique characters'.format(len(vocab)))

##テキストのベクトル化
# 文字列を数値表現に変換する
# それぞれの文字からインデックスへの対応表を作成
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

#これはテキストの全データを格納したもの。
text_as_int = np.array([char2idx for c in text])

#変換した結果を表示(最初の20件まで)
print('{')
for char,_ in zip(char2idx, range(20)):
print(' {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print(' ...\n}')

#下記のようなユニークな文字リストが生成される(1文字と順版)
#{
# '\n': 0,
# ' ' : 1,
# '!' : 2,
# '$' : 3,
# '&' : 4,
# "'" : 5,
# ',' : 6,
# '-' : 7,
# '.' : 8,
# '3' : 9,
# ':' : 10,
# ';' : 11,
# '?' : 12,
# 'A' : 13,
# ...
#}



# テキストの最初の 13 文字がどのように整数に変換されるかを見てみる
print ('{} ---- characters mapped to int ---- > {}'.format(repr(text[:13]), text_as_int[:13]))

# ひとつの入力としたいシーケンスの文字数としての最大の長さ
seq_length = 100
#examples_per_epoch = len(text)//(seq_length+1)

# 訓練用サンプルとターゲットを作る
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)

for i in char_dataset.take(5):
print(idx2char[i.numpy()])

#batch メソッドを使うと、個々の文字を求める長さのシーケンスに簡単に変換できます。
sequences = char_dataset.batch(seq_length+1, drop_remainder=True)

for item in sequences.take(5):
print(repr(''.join(idx2char[item.numpy()])))


# シーケンスそれぞれに対して、map メソッドを使って各バッチに単純な関数を適用することで、
# 複製とシフトを行い、入力テキストとターゲットテキストを生成します。
def split_input_target(chunk):
input_text = chunk[:-1]
target_text = chunk[1:]
return input_text, target_text

dataset = sequences.map(split_input_target)

#最初のサンプルの入力とターゲットを出力します。
for input_example, target_example in dataset.take(1):
print ('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
print ('Target data:', repr(''.join(idx2char[target_example.numpy()])))


# これらのベクトルのインデックスそれぞれが一つのタイムステップとして処理されます。
# タイムステップ 0 の入力として、モデルは "F" のインデックスを受け取り、
# 次の文字として "i" のインデックスを予測しようとします。
# 次のタイムステップでもおなじことをしますが、RNN は現在の入力文字に加えて、
# 過去のステップのコンテキストも考慮します。
for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
print("Step {:4d}".format(i))
print(" input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
print(" expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

#訓練用バッチ
# バッチサイズ
BATCH_SIZE = 64

# データセットをシャッフルするためのバッファサイズ
# TF data は可能性として無限長のシーケンスでも使えるように設計されています。
# このため、シーケンス全体をメモリ内でシャッフルしようとはしません。
# その代わりに、要素をシャッフルするためのバッファを保持しています)
BUFFER_SIZE = 10000

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

dataset


# 文字数で表されるボキャブラリーの長さ
vocab_size = len(vocab)

# 埋め込みベクトルの次元
embedding_dim = 256

# RNN ユニットの数
rnn_units = 1024

def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.Sequential([
tf.keras.layers.Embedding(vocab_size, embedding_dim,
batch_input_shape=[batch_size, None]),
tf.keras.layers.GRU(rnn_units,
return_sequences=True,
stateful=True,
recurrent_initializer='glorot_uniform'),
tf.keras.layers.Dense(vocab_size)
])
return model

model = build_model(
vocab_size = len(vocab),
embedding_dim=embedding_dim,
rnn_units=rnn_units,
batch_size=BATCH_SIZE)



#モデルを試す
for input_example_batch, target_example_batch in dataset.take(1):
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

model.summary()

#サンプルで試す
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()


print("Input: \n", repr("".join(idx2char[input_example_batch[0]])))
print()
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices ])))


def loss(labels, logits):
return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

example_batch_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss: ", example_batch_loss.numpy().mean())


model.compile(optimizer='adam', loss=loss)

# チェックポイントが保存されるディレクトリ
checkpoint_dir = './training_checkpoints'
# チェックポイントファイルの名称
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True)

#何回繰り返して学習するか(ある程度学習させないとめちゃくちゃな文字が生成される)
EPOCHS=10

#訓練の実行
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

tf.train.latest_checkpoint(checkpoint_dir)
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

model.build(tf.TensorShape([1, None]))
model.summary()

#テキスト生成する関数
def generate_text(model, start_string):
# 評価ステップ(学習済みモデルを使ったテキスト生成)

# 生成する文字数
num_generate = 100

# 開始文字列を数値に変換(ベクトル化)
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)

# 結果を保存する空文字列
text_generated = []

# 低い temperature は、より予測しやすいテキストをもたらし
# 高い temperature は、より意外なテキストをもたらす
# 実験により最適な設定を見つけること
temperature = 1.0

# ここではバッチサイズ == 1
model.reset_states()
for i in range(num_generate):
predictions = model(input_eval)
# バッチの次元を削除
predictions = tf.squeeze(predictions, 0)

# カテゴリー分布をつかってモデルから返された文字を予測
predictions = predictions / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

# 過去の隠れ状態とともに予測された文字をモデルへのつぎの入力として渡す
input_eval = tf.expand_dims([predicted_id], 0)

text_generated.append(idx2char[predicted_id])

return (start_string + ''.join(text_generated))


#テキストを生成する
print(generate_text(model, start_string=u"ROMEO: "))
 

実行結果

シェークスピアの訓練テキストから生成された文字が表示されればOKです。

ちょっと思ってたんと違う・・・・。
改めて、chatgptってすごいんだな・・・。

なんだか良くわからない深層学習をとりあずつついてみた・・・、という感じになってしまいました。(とりあえずチュートリアル動かした)
もっと深掘りしないといけないな、と(汗)