Как да направите прогнозиране на времеви серии, използвайки RNN, TensorFlow и Cloud ML Engine

API за оценка на tf.contrib.learn е много удобен начин да започнете да използвате TensorFlow. Наистина готиното от моята гледна точка за API на Estimators е, че използването му е много лесен начин за създаване на разпределени модели TensorFlow. Много от пробите на TensorFlow, които виждате, че се движат наоколо по интернет, не се разпространяват - те предполагат, че ще стартирате кода на една машина. Хората започват с такъв код и след това са нескромно натъжени да научат, че ниско нивоният TensorFlow код всъщност не работи върху пълния им набор от данни. След това трябва да свършат много работа, за да добавят разпределен код за обучение около първоначалната извадка и кой иска да редактира кода на някой друг?

Забележка: Оценителите вече са се преместили в основния Tensorflow. Актуализиран код, който използва tf.estimator вместо tf.contrib.learn.estimator, вече е в GitHub - използвайте обновения код като начална точка.

Така че, моля, моля, моля, ако видите проба TensorFlow, която не използва API за оценка, игнорирайте. Ще бъде много работа, за да работи върху производството на вашите производствени (прочетете: големи) набори от данни - ще има монитори, координатори, сървъри на параметри и всички видове системи, програмиращи лудориите, в които не искате да се гмуркате. Започнете с API на оценителя и използвайте класа експеримент. (Отказ от отговорност: моите възгледи, а не тези на моя работодател).

Предсказването на времевите серии се нуждае от персонализиран оценител

API на оценителите се предлага с класификатор и регресор на Deep Neural Network. Ако имате типични структурирани данни, следвайте урока, свързан по-горе, или вземете този курс за обучение от Google Cloud (скоро ще бъде наличен в Coursera) и ще сте на път да създадете модели за машинно обучение, които работят в големи набори от данни в реалния свят във вашия релационен склад на данни. Но какво ще стане, ако нямате типичен проблем със структурирани данни? В този случай често ще трябва да създадете персонализиран оценител. В тази публикация в блога ще ви покажа как.

Често срещан тип данни, на които ще искате да направите машинно обучение, са данни от времеви серии. По същество вашите входове са набор от числа и искате да предвидите следващото число в тази последователност. В тази статия ще го направя малко по-общ и ще предположа, че искате да предскажете последните две числа от последователността. Както върви поговорката на Computer Science, ако можете да направите две, можете да направите N.

Традиционната архитектура на невронната мрежа, която се използва за предсказване на последователност към последователност, се нарича Рекуррентна невронна мрежа (RNN). Вижте тази статия и тази за много достъпно запознаване с RNN. Но не е нужно да знаете как да внедрите RNN, за да го използвате, така че след като тези статии отидат по-дълбоко, отколкото искате, напуснете.

За да следвате тази статия, отворете моята тетрадка Jupyter в друг прозорец на браузъра. Тук показвам само ключови фрагменти от код. Бележникът (и папката GitHub) съдържа целия код.

Симулирайте някои данни от времеви серии

Обикновено е по-лесно да научите с малък набор от играчки, който можете да генерирате колкото искате. Реалните данни ще идват със собствени странности! Така че, нека да генерираме куп данни от времеви серии. Всяка последователност ще се състои от 10 числа. Ще използваме първите осем като входни данни, а последните два като етикети (т.е. какво трябва да се предвиди):

Кодът за генериране на тези последователности от времеви серии, използвайки numpy (np):

SEQ_LEN = 10
def create_time_series ():
  freq = (np.random.random () * 0.5) + 0.1 # 0.1 до 0.6
  ampl = np.random.random () + 0,5 # 0,5 до 1,5
  x = np.sin (np.arange (0, SEQ_LEN) * freq) * ampl
  връщане x

Напишете един куп от тези поредици от времеви серии на CSV файлове (train.csv и valid.csv) и ние работим. Имаме данни.

Входна функция

Начинът, по който работи API на оценителите в TensorFlow, е, че трябва да предоставите input_fn, за да четете данните си. Не предоставяте стойности x и y. Вместо това предоставяте функция, която връща входове и етикети. Входовете са речник на всичките ви входове (име на вход в тензор), а етикетите са тензор.

В нашия случай, нашият CSV файл просто се състои от 10 числа с плаваща запетая. DEFAULTS служи за определяне на типа данни за тензорите. Искаме да четем данните 20 реда наведнъж; това е BATCH_SIZE. Партида е броят на пробите, над които се извършва наклон на градиента. Ще трябва да експериментирате с този номер - ако той е твърде голям, обучението ви ще бъде бавно и ако е твърде малко, обучението ви ще отскочи, няма да се сближат. Тъй като имаме само вход, името, което го въвеждате, всъщност няма значение. Ще го наречем rawdata.

ЗАДЪЛЖЕНИЯ = [[0.0] за x в xrange (0, SEQ_LEN)]
BATCH_SIZE = 20
TIMESERIES_COL = 'rawdata "
N_OUTPUTS = 2 # във всяка последователност, 1-8 са функции, а 9-10 е етикет
N_INPUTS = SEQ_LEN - N_OUTPUTS

Input_fn, който API API за оценка, не трябва да приема параметри. Ние обаче искаме да можем да предоставим името на файла (ите), което да се чете в командния ред. Така че, нека напишем функция read_dataset (), която връща input_fn.

# прочетете данни и конвертирайте в необходимия формат
def read_dataset (име на файл, режим = tf.contrib.learn.ModeKeys.TRAIN):
  def _input_fn ():
    num_epochs = 100, ако режим == tf.contrib.learn.ModeKeys.TRAIN else 1

Първото нещо, което правим, е да решим броя на епохите. Това е колко пъти трябва да преминем през набора от данни. Ще преминем през набора от данни 100 пъти, ако тренираме, но само веднъж, ако оценяваме.

На следващо място ще направим разширение на wild-card. Много пъти програмите за големи данни произвеждат шарени файлове като train.csv-0001-of-0036 и така, бихме искали просто да предоставим train.csv * като вход. Използваме това за попълване на опашка за име на файл и след това използваме TextLineReader за четене на данните:

# може да бъде път към един файл или файл.
input_file_names = tf.train.match_filenames_once (име на файл)
filename_queue = tf.train.string_input_producer (
        input_file_names, num_epochs = num_epochs, shuffle = True)
читател = tf.TextLineReader ()
    _, value = reader.read_up_to (име на име_именник, num_records = BATCH_SIZE)
value_column = tf.expand_dims (стойност, -1)

След това ние декодираме данните, третирайки първите 8 числа като входни данни, а последните две като етикет. Входовете, когато го четем, е списък от 8 тензора, всеки от които е пакетен размер x 1. Използването на tf.concat го прави единичен тензор 8xbatchsize. Това е важно, тъй като API на оценителите иска тензори, а не списъци.

# all_data е списък на тензорите
all_data = tf.decode_csv (value_column, record_defaults = DEFAULTS)
inputs = all_data [: len (all_data) -N_OUTPUTS] # първи няколко стойности
label = all_data [len (all_data) -N_OUTPUTS:] # последните няколко стойности
   
# от списъка на тензорите до тензора с още едно измерение
inputs = tf.concat (входове, ос = 1)
label = tf.concat (етикет, ос = 1)
print 'inputs = {}'. Формат (входове)
   
връщане {TIMESERIES_COL: inputs}, етикет # dict на функциите, етикет

Определете RNN

Ако използвахме LinearRegressor, DNNRegressor, DNNLinearCombinedRegressor и т.н., можеше просто да използваме съществуващия клас. Но понеже правим прогнозиране на последователност към последователност, трябва да напишем собствената си моделна функция. Поне в момента API на Estimators не се предлага с RNNRegressor, който е извън кутията. Така че, нека разгърнем нашия собствен RNN модел, използвайки ниско ниво на TensorFlow функции.

LSTM_SIZE = 3 # брой скрити слоеве във всяка от LSTM клетките

# създайте изходния модел
дефинирайте simple_rnn (функции, цели, режим):
  # 0. Преформатирайте формата за въвеждане, за да се превърне в последователност
  x = tf.split (функции [TIMESERIES_COL], N_INPUTS, 1)
  #print 'x = {}' .формат (x)
    
  # 1. конфигурирайте RNN
  lstm_cell = rnn.BasicLSTMCell (LSTM_SIZE, забравяне_bias = 1.0)
  изходи, _ = rnn.static_rnn (lstm_cell, x, dtype = tf.float32)

  # филийка, за да запазите само последната клетка на RNN
  изходи = изходи [-1]
  #print 'last outputs = {}' .форматен формат (изходи)
  
  # изходът е резултат от линейно активиране на последния слой RNN
  тегло = tf. Променливо (tf.random_normal ([LSTM_SIZE, N_OUTPUTS]))
  отклонение = tf.Variable (tf.random_normal ([N_OUTPUTS]))
  прогнози = tf.matmul (изходи, тегло) + пристрастия
    
  # 2. Определете функцията за загуба за обучение / оценяване
  #print 'target = {}' .форматен формат (цели)
  #print 'preds = {}' .форматен формат (прогнози)
  загуба = tf.losses.mean_squared_error (цели, прогнози)
  eval_metric_ops = {
      "rmse": tf.metrics.root_mean_squared_error (цели, прогнози)
  }
  
  # 3. Определете тренировъчната операция / оптимизатор
  train_op = tf.contrib.layers.optimize_loss (
      загуба = загуба,
      global_step = tf.contrib.framework.get_global_step (),
      learning_rate = 0.01,
      оптимизатор = "SGD")

  # 4. Създайте прогнози
  predvitions_dict = {"прогнозиран": прогнози}
  
  # 5. върнете ModelFnOps
  връщане tflearn.ModelFnOps (
      режим = режим,
      прогнози = predictions_dict,
      загуба = загуба,
      train_op = train_op,
      eval_metric_ops = eval_metric_ops)

Спомнете си, че трябваше да пакетираме входовете в един тензор, за да го предадем като функции от input_fn. Стъпка 0 просто обръща този процес и връща списъка с тензорите.

Повтарящата се невронна мрежа се състои от BasicLSTMLCell, към който преминавате във входа. Получавате обратно изходи и състояния. Нарежете го, за да запазите само последната клетка на RNN - ние не използваме нито едно от предишните състояния. Възможни са и други архитектури. Например, бих могъл да обуча мрежата да има само един изход винаги и да използвам подвижни прозорци. Ще говоря за това как да променя моя пример, за да го направя в края на тази статия.

Коментарите в кода по-горе са доста обясняващи по отношение на останалите стъпки. Не правим нищо изненадващо там. Това е проблем с регресията, затова използвам RMSE.

Създайте експеримент

Класът Експеримент е интелигентният в API на оценители. Той знае как да поеме функцията на модела, входните функции за обучение и валидиране и да направи разумни неща по отношение на разпространението, ранното спиране и др.

дефинирай get_train ():
  връщане read_dataset ('train.csv', mode = tf.contrib.learn.ModeKeys.TRAIN)

дефинирай get_valid ():
  върнете read_dataset ('valid.csv', mode = tf.contrib.learn.ModeKeys.EVAL)

деф експеримент_фн (изход_дир):
    # пусни експеримент
    връщане tflearn.Experiment (
        tflearn.Estimator (model_fn = simple_rnn, model_dir = output_dir),
        train_input_fn = get_train (),
        eval_input_fn = get_valid (),
        eval_metrics = {
            'rmse': tflearn.MetricSpec (
                metric_fn = metrics.streaming_root_mean_squared_error
            )
        }
    )

shutil.rmtree ('outputdir', ignore_errors = True) # стартирайте всеки път нов
learn_runner.run (експеримент_фн, "изход":)

Обучение на облака

Кодът по-горе работи на една машина и ако го пакетирате в модул Python, можете също да го изпратите в Cloud ML Engine, за да го обучите без сървър:

OUTDIR = GS: // $ {BUCKET} / simplernn / model_trained
JOBNAME = simplernn _ $ (дата -u +% y% m% d_% H% M% S)
РАЙОН = нас-central1
gsutil -m rm -rf $ OUTDIR
заданията на gcloud ml-engine изпращат обучение $ JOBNAME \
   --region = $ REGION \
   --module-name = trainer.task \
   --package-path = $ {REPO} / simplernn / trainer \
   --job-dir = $ OUTDIR \
   --staging-bucket = gs: // $ BUCKET \
   --scale-tier = ОСНОВНО \
   - време за изпълнение = 1.0 \
   - \
   --train_data_paths = "gs: // $ {BUCKET} /train.csv*" \
   --eval_data_paths = "gs: // $ {BUCKET} /valid.csv*" \
   --output_dir = $ OUTDIR \
   --num_epochs = 100

Често срещан вариант: много дълги времеви серии

В тази статия предположих, че имате хиляди кратки (10-елементни) последователности. Ами ако имате много дълга последователност? Например, може да имате цена на запаса или отчитане на температурата от датчик. В такива случаи това, което можете да направите, е да разчупите дългата си последователност в подвижни последователности с фиксирана дължина. Тази дължина очевидно е произволна, но помислете за това като интервал „гледане назад“ на RNN. Ето кода на TensorFlow, който ще отнеме дълга последователност и ще се раздели на по-малки, припокриващи се последователности с фиксирана дължина:

внос tensorflow като tf
импортиране numpy като np
дефиниране на разделяне (sess, x, lookback_len):
  N = sess.run (tf.размер (x))
  windows = [tf.slice (x, [b], [lookback_len]) за b в xrange (0, N-lookback_len)]
  windows = tf.stack (windows)
  връщане на прозорци

Например:

x = tf.констант (np.arange (1,11, dtype = np.float32))
с tf.Session () като sess:
    отпечатайте 'input =', x.eval ()
    seqx = разпад (sess, x, 5)
    print 'output =', seqx.eval ()

ще доведе до:

вход = [1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
изход = [[1. 2. 3. 4. 5.]
 [2. 3. 4. 5. 6.]
 [3. 4. 5. 6. 7.]
 [4. 5. 6. 7. 8.]
 [5. 6. 7. 8. 9.]]

След като имате тези последователности с фиксирана дължина, всичко е същото като преди.

Честито кодиране!