출처
본 코드는 TFT 논문 구글 공식 코드를 제가 개눈 감치듯이 흐름만 파악한 것입니다.
model 부분 이외의 분석은 깃헙 해당 레포의 tft 폴더에 마크다운 형식으로 올라가 있습니다.
tft_model.py
: contains the full TFT architecture
필요한 모듈 import 하기
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gc
import json
import os
import shutil
import data_formatters.base
import libs.utils as utils
import numpy as np
import pandas as pd
import tensorflow as tf
- _future_ 모듈 : 파이썬2와 파이썬3을 동시에 동작시키자
layer definitions
# Layer definitions.
concat = tf.keras.backend.concatenate
stack = tf.keras.backend.stack
K = tf.keras.backend
Add = tf.keras.layers.Add
LayerNorm = tf.keras.layers.LayerNormalization
Dense = tf.keras.layers.Dense
Multiply = tf.keras.layers.Multiply
Dropout = tf.keras.layers.Dropout
Activation = tf.keras.layers.Activation
Lambda = tf.keras.layers.Lambda
-
케라스 백엔드 모듈 : 케라스에서 low-level 연산을 핸들링할 수 있도록하는 모듈 (한마디로 내적이나 이것저것 텐서 연산할 때 잘 할 수 있게 하는 모듈이라고 우선 이해했다)
-
tf.keras.backend.concatenate : it takes as input a list of tensors, all of the same shape except for the concatenation axis, and returns a single tensor that is the concatenation of all inputs
-
tf.keras.backend.stack : stacks a list of rank - R tensors into one rank - (R+1) tensor
-
tf.keras.layers.Add : layers that adds a list of inputs. It takes as input a list of tensors, all of the same shape, and returns a single tensor (also of the same shape)
-
tf.keras.layers.Multiply : layers that multiplies (element-wise) a list of inputs
-
tf.keras.layers.Lambda : 단순한 커스텀 레이어 사용
ex. tf.keras.layers.Lambda(my_relu)
Default input types
# Default input types. InputTypes = data_formatters.base.InputTypes
remind)
class InputTypes(enum.IntEnum): """Defines input types of each column.""" TARGET = 0 OBSERVED_INPUT = 1 KNOWN_INPUT = 2 STATIC_INPUT = 3 ID = 4 # Single column used as an entity identifier TIME = 5 # Single column exclusively used as a time index
Layer utility functions
1) linear_layer : returns simple Keras linear layer
def linear_layer(size, activation=None, use_time_distributed=False, use_bias=True): """Returns simple Keras linear layer. Args: size: Output size activation: Activation function to apply if required use_time_distributed: Whether to apply layer across time use_bias: Whether bias should be included in layer """ linear = tf.keras.layers.Dense(size, activation=activation, use_bias=use_bias) if use_time_distributed: linear = tf.keras.layers.TimeDistributed(linear) return linear
-
tf.keras.layers.TimeDistributed :
-
2) apply_mlp : applies simple feed-forward network to an input
def apply_mlp(inputs,
hidden_size,
output_size,
output_activation=None,
hidden_activation='tanh',
use_time_distributed=False):
"""Applies simple feed-forward network to an input.
Args:
inputs: MLP inputs
hidden_size: Hidden state size
output_size: Output size of MLP
output_activation: Activation function to apply on output
hidden_activation: Activation function to apply on input
use_time_distributed: Whether to apply across time
Returns:
Tensor for MLP outputs.
"""
if use_time_distributed:
hidden = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(hidden_size, activation=hidden_activation))(
inputs)
return tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(output_size, activation=output_activation))(
hidden)
else:
hidden = tf.keras.layers.Dense(
hidden_size, activation=hidden_activation)(
inputs)
return tf.keras.layers.Dense(
output_size, activation=output_activation)(
hidden)
3) apply_gating_layer : applies a GLU to an input
def apply_gating_layer(x,
hidden_layer_size,
dropout_rate=None,
use_time_distributed=True,
activation=None):
"""Applies a Gated Linear Unit (GLU) to an input.
Args:
x: Input to gating layer
hidden_layer_size: Dimension of GLU
dropout_rate: Dropout rate to apply if any
use_time_distributed: Whether to apply across time
activation: Activation function to apply to the linear feature transform if
necessary
Returns:
Tuple of tensors for: (GLU output, gate)
"""
if dropout_rate is not None:
x = tf.keras.layers.Dropout(dropout_rate)(x)
if use_time_distributed:
activation_layer = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(hidden_layer_size, activation=activation))(
x)
gated_layer = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(hidden_layer_size, activation='sigmoid'))(
x)
else:
activation_layer = tf.keras.layers.Dense(
hidden_layer_size, activation=activation)(
x)
gated_layer = tf.keras.layers.Dense(
hidden_layer_size, activation='sigmoid')(
x)
return tf.keras.layers.Multiply()([activation_layer,
gated_layer]), gated_layer
주의 ! return 값이 2개 (gate output 값, (5)-1)
4) add_and_norm : applies skip connection followed by layer normalisation
- x_list : a (original inputs) + gated_layer
def add_and_norm(x_list):
"""Applies skip connection followed by layer normalisation.
Args:
x_list: List of inputs to sum for skip connection
Returns:
Tensor output from layer.
"""
tmp = Add()(x_list)
tmp = LayerNorm()(tmp)
return tmp
5) gated_residual_network : applies GRN as defined in paper
def gated_residual_network(x,
hidden_layer_size,
output_size=None,
dropout_rate=None,
use_time_distributed=True,
additional_context=None,
return_gate=False):
"""Applies the gated residual network (GRN) as defined in paper.
Args:
x: Network inputs
hidden_layer_size: Internal state size
output_size: Size of output layer
dropout_rate: Dropout rate if dropout is applied
use_time_distributed: Whether to apply network across time dimension
additional_context: Additional context vector to use if relevant
return_gate: Whether to return GLU gate for diagnostic purposes
Returns:
Tuple of tensors for: (GRN output, GLU gate)
"""
# Setup skip connection
if output_size is None:
output_size = hidden_layer_size
skip = x
else:
linear = Dense(output_size)
if use_time_distributed:
linear = tf.keras.layers.TimeDistributed(linear)
skip = linear(x)
# Apply feedforward network
hidden = linear_layer(
hidden_layer_size,
activation=None,
use_time_distributed=use_time_distributed)(
x)
if additional_context is not None:
hidden = hidden + linear_layer(
hidden_layer_size,
activation=None,
use_time_distributed=use_time_distributed,
use_bias=False)(
additional_context)
hidden = tf.keras.layers.Activation('elu')(hidden)
hidden = linear_layer(
hidden_layer_size,
activation=None,
use_time_distributed=use_time_distributed)(
hidden)
gating_layer, gate = apply_gating_layer(
hidden,
output_size,
dropout_rate=dropout_rate,
use_time_distributed=use_time_distributed,
activation=None)
if return_gate:
return add_and_norm([skip, gating_layer]), gate
else:
return add_and_norm([skip, gating_layer])
6) get_decoder_mask : returns casual mask to apply for self-attention layer
# Attention Components.
def get_decoder_mask(self_attn_inputs):
"""Returns causal mask to apply for self-attention layer.
Args:
self_attn_inputs: Inputs to self attention layer to determine mask shape
"""
len_s = tf.shape(self_attn_inputs)[1]
bs = tf.shape(self_attn_inputs)[:1]
mask = K.cumsum(tf.eye(len_s, batch_shape=bs), 1)
return mask
-
tf.shape : 텐서의 구조를 반환
-
tf.eye : 단위행렬
batch_shape 을 지정하면 전체 shape 은 (batch_shape,단위행렬 shape)
ex. c 라는 변수의 shape 이 4차원 (2,3,10,10) 이라고 하면
>>> X=tf.shape(c)[:1]
>>> Y=tf.shape(c)[1]
>>> Z=tf.eye(Y,batch_shape=X)
>>> Z
<tf.Tensor: shape=(2, 3, 3), dtype=float32, numpy=
array([[[1., 0., 0.],
[0., 1., 0.],
[0., 0., 1.]],
[[1., 0., 0.],
[0., 1., 0.],
[0., 0., 1.]]], dtype=float32)>
-
K.cumsum : compute the cumulative sum (지정한 축이 기준)
>>> K.cumsum(Z,1) <tf.Tensor: shape=(2, 3, 3), dtype=float32, numpy= array([[[1., 0., 0.], [1., 1., 0.], [1., 1., 1.]], [[1., 0., 0.], [1., 1., 0.], [1., 1., 1.]]], dtype=float32)>
7) ScaledDotProductAttention : Defines scaled dot product attention layer
속성 정의
- self.dropout
- self.activation
class ScaledDotProductAttention():
"""Defines scaled dot product attention layer.
Attributes:
dropout: Dropout rate to use
activation: Normalisation function for scaled dot product attention (e.g.
softmax by default)
"""
def __init__(self, attn_dropout=0.0):
self.dropout = Dropout(attn_dropout)
self.activation = Activation('softmax')
attention 연산 부분
def __call__(self, q, k, v, mask):
"""Applies scaled dot product attention.
Args:
q: Queries
k: Keys
v: Values
mask: Masking if required -- sets softmax to very large value
Returns:
Tuple of (layer outputs, attention weights)
"""
temper = tf.sqrt(tf.cast(tf.shape(k)[-1], dtype='float32'))
attn = Lambda(lambda x: K.batch_dot(x[0], x[1], axes=[2, 2]) / temper)(
[q, k]) # shape=(batch, q, k)
if mask is not None:
mmask = Lambda(lambda x: (-1e+9) * (1. - K.cast(x, 'float32')))(
mask) # setting to infinity
attn = Add()([attn, mmask])
attn = self.activation(attn)
attn = self.dropout(attn)
output = Lambda(lambda x: K.batch_dot(x[0], x[1]))([attn, v])
return output, attn
-
tf.cast : casts a tensor to a new type
-
tf.sqrt : 제곱근
-
K.batch_dot : batch 크기에 맞춰서 지정한 axes 끼리 dot-product 연산
하 이건 진짜 겨우 이해했다. 나중에 따로 정리해야겠다 =_= 결국 einsum 이랑 거의 똑같이 작동한다.
8) InterpretableMultiHeadAttention : defines interpretable multi-head attention layer
-
stack vs concat
전반적으로 텐서플로우 코드 구현했을 때와 거의 유사하다.
class InterpretableMultiHeadAttention(): """Defines interpretable multi-head attention layer. Attributes: n_head: Number of heads d_k: Key/query dimensionality per head d_v: Value dimensionality dropout: Dropout rate to apply qs_layers: List of queries across heads ks_layers: List of keys across heads vs_layers: List of values across heads attention: Scaled dot product attention layer w_o: Output weight matrix to project internal state to the original TFT state size """ def __init__(self, n_head, d_model, dropout): """Initialises layer. Args: n_head: Number of heads d_model: TFT state dimensionality dropout: Dropout discard rate """ self.n_head = n_head self.d_k = self.d_v = d_k = d_v = d_model // n_head self.dropout = dropout self.qs_layers = [] self.ks_layers = [] self.vs_layers = [] # Use same value layer to facilitate interp vs_layer = Dense(d_v, use_bias=False) for _ in range(n_head): self.qs_layers.append(Dense(d_k, use_bias=False)) self.ks_layers.append(Dense(d_k, use_bias=False)) self.vs_layers.append(vs_layer) # use same vs_layer self.attention = ScaledDotProductAttention() self.w_o = Dense(d_model, use_bias=False) def __call__(self, q, k, v, mask=None): """Applies interpretable multihead attention. Using T to denote the number of time steps fed into the transformer. Args: q: Query tensor of shape=(?, T, d_model) k: Key of shape=(?, T, d_model) v: Values of shape=(?, T, d_model) mask: Masking if required with shape=(?, T, T) Returns: Tuple of (layer outputs, attention weights) """ n_head = self.n_head heads = [] attns = [] for i in range(n_head): qs = self.qs_layers[i](q) ks = self.ks_layers[i](k) vs = self.vs_layers[i](v) head, attn = self.attention(qs, ks, vs, mask) head_dropout = Dropout(self.dropout)(head) heads.append(head_dropout) attns.append(attn) head = K.stack(heads) if n_head > 1 else heads[0] attn = K.stack(attns) outputs = K.mean(head, axis=0) if n_head > 1 else head outputs = self.w_o(outputs) outputs = Dropout(self.dropout)(outputs) # output dropout return outputs, attn
9) TFTDataCache : caches data for the TFT
class TFTDataCache(object):
"""Caches data for the TFT."""
_data_cache = {}
@classmethod
def update(cls, data, key):
"""Updates cached data.
Args:
data: Source to update
key: Key to dictionary location
"""
cls._data_cache[key] = data
@classmethod
def get(cls, key):
"""Returns data stored at key location."""
return cls._data_cache[key].copy()
@classmethod
def contains(cls, key):
"""Retuns boolean indicating whether key is present in cache."""
return key in cls._data_cache
-
class method : 클래스 메서드 사용하기
cls 로 클래스 속성에 접근 !
-
cache : 캐싱은 정보를 다시 계산하지 않고 저장했다가 알려주는 것 (우선은 이렇게만 이해했다)
한마디로 데이터 캐싱을 하는 메소드인 것 같다
10) 드디어 ! 👋 TemporalFusionTransformer : defines Temporal Fusion Transformer
레이어 안에 너무 많은 메소드가 있어서 다 일일이 뜯어보지는 못하고 어떻게 돌아가는지 흐름 위주로 파악했다
- get_tft_embeddings : Transforms raw inputs to embeddings
- inner 메소드 (convert_real_to_embedding : Applies linear transformation for time-varying inputs)
- _get_single_col_by_type: returns name of single column for input type
- cache_batched_data : batches and caches data once for using during training
- _batch_sampled_data : sample segments into a compatible format
- _batch_data : batches data for training
- _get_active_locations : formats sample weights for keras training
- _build_base_graph : returns graph defining layers of the TFT
- Static_combine_and mask : applies variable selection network to static inputs
- lstm_combine_and_mask : apply temporal variable selection networks
- get_lstm : returns lstm cell initialized with default parameters
- build_model : build model and defines training losses
- fit
- evaluate
- predict
- format_outputs : returns formatted dataframes for prediction
- get_attention : computes TFT attention weights for a given dataset
속성 정의
Attributes:
name: Name of model
time_steps: Total number of input time steps per forecast date (i.e. Width
of Temporal fusion decoder N)
input_size: Total number of inputs
output_size: Total number of outputs
category_counts: Number of categories per categorical variable
n_multiprocessing_workers: Number of workers to use for parallel
computations
column_definition: List of tuples of (string, DataType, InputType) that
define each column
quantiles: Quantiles to forecast for TFT
use_cudnn: Whether to use Keras CuDNNLSTM or standard LSTM layers
hidden_layer_size: Internal state size of TFT
dropout_rate: Dropout discard rate
max_gradient_norm: Maximum norm for gradient clipping
learning_rate: Initial learning rate of ADAM optimizer
minibatch_size: Size of minibatches for training
num_epochs: Maximum number of epochs for training
early_stopping_patience: Maximum number of iterations of non-improvement
before early stopping kicks in
num_encoder_steps: Size of LSTM encoder -- i.e. number of past time steps
before forecast date to use
num_stacks: Number of self-attention layers to apply (default is 1 for basic
TFT)
num_heads: Number of heads for interpretable mulit-head attention
model: Keras model for TFT
"""
def __init__(self, raw_params, use_cudnn=False):
"""Builds TFT from parameters.
Args:
raw_params: Parameters to define TFT
use_cudnn: Whether to use CUDNN GPU optimised LSTM
"""
self.name = self.__class__.__name__
params = dict(raw_params) # copy locally
# Data parameters
self.time_steps = int(params['total_time_steps'])
self.input_size = int(params['input_size'])
self.output_size = int(params['output_size'])
self.category_counts = json.loads(str(params['category_counts']))
self.n_multiprocessing_workers = int(params['multiprocessing_workers'])
# Relevant indices for TFT
self._input_obs_loc = json.loads(str(params['input_obs_loc']))
self._static_input_loc = json.loads(str(params['static_input_loc']))
self._known_regular_input_idx = json.loads(
str(params['known_regular_inputs']))
self._known_categorical_input_idx = json.loads(
str(params['known_categorical_inputs']))
self.column_definition = params['column_definition']
# Network params
self.quantiles = [0.1, 0.5, 0.9]
self.use_cudnn = use_cudnn # Whether to use GPU optimised LSTM
self.hidden_layer_size = int(params['hidden_layer_size'])
self.dropout_rate = float(params['dropout_rate'])
self.max_gradient_norm = float(params['max_gradient_norm'])
self.learning_rate = float(params['learning_rate'])
self.minibatch_size = int(params['minibatch_size'])
self.num_epochs = int(params['num_epochs'])
self.early_stopping_patience = int(params['early_stopping_patience'])
self.num_encoder_steps = int(params['num_encoder_steps'])
self.num_stacks = int(params['stack_size'])
self.num_heads = int(params['num_heads'])
# Serialisation options
self._temp_folder = os.path.join(params['model_folder'], 'tmp')
self.reset_temp_folder()
# Extra components to store Tensorflow nodes for attention computations
self._input_placeholder = None
self._attention_components = None
self._prediction_parts = None
print('*** {} params ***'.format(self.name))
for k in params:
print('# {} = {}'.format(k, params[k]))
# Build model
self.model = self.build_model()
hidden_layer_size 는 lstm 인코더 디코더에 들어갈 모델 차원 수이고, 따라서 임베딩 시켜줄 때도 이에 맞춰주는 것을 알 수 있다.
_input_obs_loc 는 id, time 을 뺀 전체 칼럼에서 타겟의 인덱스이다.
ex. 여기 칼럼에서 _input_obs_loc 은 [0]
id | hours_from_start | power_usage | hour | day_of_week | hours_from_start | categorical_id |
---|---|---|---|---|---|---|
실수형 | 실수형 | 실수형 | 실수형 | 실수형 | 실수형 | 카테고리형 |
ID | 시간(t) | y값 (target) | 기본 값 | 기본 값 | 기본 값 | 메타데이터 |
_static_input_loc 은 id,time 을 뺀 전체 칼럼에서 메타 데이터의 인덱스이다.
ex. 여기 위 칼럼에서 _static_input_loc 은 [4]
_known_regular_input_idx 는 id,time을 뺀 전체 칼럼에서 / 실수형 데이터 중 / 인풋 타입이 기본 값 이나 메타 데이터 인 데이터의 인덱스이다.
ex. 여기 위 칼럼에서 _known_regular_input_idx 는 [1,2,3]
_known_categorical_input_idx 는 id,time을 뺀 전체 칼럼에서 / 범주형 데이터 중 / 인풋 타입이 기본 값이나 메티 데이터인 데이터의 인덱스이다.
ex. 여기 위 칼럼에서 _known_categorical_input_idx 는 [0]
time_steps 는 전체 인풋의 width : electricity 데이터라면 24 * 7 + 24 * 1 = 24 * 8
input_size 는 id,time 을 뺀 전체 칼럼 갯수 ex. 여기 위 칼럼에서라면 [5]
output_size 는 target 의 총 개수
임베딩 : get_tft_embeddings , (convert_real_to_embedding)
return unknown_inputs, known_combined_layer, obs_inputs, static_inputs
제일 마지막 리턴 값이 총 4개이다.
Remind) TFT 에서 인풋 타입
-
y 값 (target)
- 측정 값
- 측정 X, 기본 값
- 메타 데이터
- ID
- 시간 (t)
Unknown_inputs : 임베딩 된 실수형/측정 값 + 카테고리형/측정 값
Known_inputs : 임베딩 된 실수형/ 기본 값 + 카테고리형/기본 값
obs_inputs : 임베딩 된 타겟
Static_inputs : 임베딩 된 메타데이터
cache_batched_data, _batch_sampled_data, _batch_data
원 training, validation 데이터는 _batch_data 를 통해 배치에 담긴다.
"""Batches data for training.
Converts raw dataframe from a 2-D tabular format to a batched 3-D array
to feed into Keras model.
Args:
data: DataFrame to batch
Returns:
Batched Numpy array with shape=(?, self.time_steps, self.input_size)
"""
sampled_data = {
'inputs': inputs,
'outputs': outputs[:, self.num_encoder_steps:, :],
'active_entries': np.ones_like(outputs[:, self.num_encoder_steps:, :]),
'time': time,
'identifier': identifiers
}
return sampled_data
_batch_sampled_data 에서 데이터가 해당 포맷에 맞게 변형된 후 리턴된다.
( 물론 _batch_data 에서도 해당 포맷 그대로 데이터가 배치에 담긴다. )
if num_samples > 0:
TFTDataCache.update(
self._batch_sampled_data(data, max_samples=num_samples), cache_key)
else:
TFTDataCache.update(self._batch_data(data), cache_key)
print('Cached data "{}" updated'.format(cache_key))
그리고 cache_batched_data 에서 _batch_sampled_data를 통해 호환가능한 포맷으로 바뀐 데이터를 캐싱한다.
즉, 현재 사용할 수 있는 train_df 가 없는 경우 캐싱된 것을 가져오고, 있는 경우 _batch_data 메소드를 통해 맞는 형식으로 바꿔서 배치에 담는다.
if train_df is None:
print('Using cached training data')
train_data = TFTDataCache.get('train')
else:
train_data = self._batch_data(train_df)
그 외의 중요 모델 layers 는 앞서서 사전에 정의해준 layer utility functions 을 활용한 것으로 보인다.