출처

본 코드는 TFT 논문 구글 공식 코드를 제가 개눈 감치듯이 흐름만 파악한 것입니다.

model 부분 이외의 분석은 깃헙 해당 레포의 tft 폴더에 마크다운 형식으로 올라가 있습니다.

google-research-tft

tft_model.py

: contains the full TFT architecture

필요한 모듈 import 하기

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import gc
import json
import os
import shutil

import data_formatters.base
import libs.utils as utils
import numpy as np
import pandas as pd
import tensorflow as tf

layer definitions

# Layer definitions.
concat = tf.keras.backend.concatenate
stack = tf.keras.backend.stack
K = tf.keras.backend
Add = tf.keras.layers.Add
LayerNorm = tf.keras.layers.LayerNormalization
Dense = tf.keras.layers.Dense
Multiply = tf.keras.layers.Multiply
Dropout = tf.keras.layers.Dropout
Activation = tf.keras.layers.Activation
Lambda = tf.keras.layers.Lambda
  • 케라스 백엔드 모듈 : 케라스에서 low-level 연산을 핸들링할 수 있도록하는 모듈 (한마디로 내적이나 이것저것 텐서 연산할 때 잘 할 수 있게 하는 모듈이라고 우선 이해했다)

    • tf.keras.backend.concatenate : it takes as input a list of tensors, all of the same shape except for the concatenation axis, and returns a single tensor that is the concatenation of all inputs

      concatenate

    • tf.keras.backend.stack : stacks a list of rank - R tensors into one rank - (R+1) tensor

    • tf.keras.layers.Add : layers that adds a list of inputs. It takes as input a list of tensors, all of the same shape, and returns a single tensor (also of the same shape)

    • tf.keras.layers.Multiply : layers that multiplies (element-wise) a list of inputs

    • tf.keras.layers.Lambda : 단순한 커스텀 레이어 사용

    ​ ex. tf.keras.layers.Lambda(my_relu)

    Default input types

    # Default input types.
    InputTypes = data_formatters.base.InputTypes
    

    remind)

    class InputTypes(enum.IntEnum):
      """Defines input types of each column."""
      TARGET = 0
      OBSERVED_INPUT = 1
      KNOWN_INPUT = 2
      STATIC_INPUT = 3
      ID = 4  # Single column used as an entity identifier
      TIME = 5  # Single column exclusively used as a time index
    

    Layer utility functions

    1) linear_layer : returns simple Keras linear layer

    def linear_layer(size,
                     activation=None,
                     use_time_distributed=False,
                     use_bias=True):
      """Returns simple Keras linear layer.
      Args:
        size: Output size
        activation: Activation function to apply if required
        use_time_distributed: Whether to apply layer across time
        use_bias: Whether bias should be included in layer
      """
      linear = tf.keras.layers.Dense(size, activation=activation, use_bias=use_bias)
      if use_time_distributed:
        linear = tf.keras.layers.TimeDistributed(linear)
      return linear
    
    • tf.keras.layers.TimeDistributed :

      캡처

      KakaoTalk_20210404_141437077

2) apply_mlp : applies simple feed-forward network to an input

def apply_mlp(inputs,
              hidden_size,
              output_size,
              output_activation=None,
              hidden_activation='tanh',
              use_time_distributed=False):
  """Applies simple feed-forward network to an input.
  Args:
    inputs: MLP inputs
    hidden_size: Hidden state size
    output_size: Output size of MLP
    output_activation: Activation function to apply on output
    hidden_activation: Activation function to apply on input
    use_time_distributed: Whether to apply across time
  Returns:
    Tensor for MLP outputs.
  """
  if use_time_distributed:
    hidden = tf.keras.layers.TimeDistributed(
        tf.keras.layers.Dense(hidden_size, activation=hidden_activation))(
            inputs)
    return tf.keras.layers.TimeDistributed(
        tf.keras.layers.Dense(output_size, activation=output_activation))(
            hidden)
  else:
    hidden = tf.keras.layers.Dense(
        hidden_size, activation=hidden_activation)(
            inputs)
    return tf.keras.layers.Dense(
        output_size, activation=output_activation)(
            hidden)

KakaoTalk_20210404_142721798

3) apply_gating_layer : applies a GLU to an input

def apply_gating_layer(x,
                       hidden_layer_size,
                       dropout_rate=None,
                       use_time_distributed=True,
                       activation=None):
  """Applies a Gated Linear Unit (GLU) to an input.
  Args:
    x: Input to gating layer
    hidden_layer_size: Dimension of GLU
    dropout_rate: Dropout rate to apply if any
    use_time_distributed: Whether to apply across time
    activation: Activation function to apply to the linear feature transform if
      necessary
  Returns:
    Tuple of tensors for: (GLU output, gate)
  """

  if dropout_rate is not None:
    x = tf.keras.layers.Dropout(dropout_rate)(x)

  if use_time_distributed:
    activation_layer = tf.keras.layers.TimeDistributed(
        tf.keras.layers.Dense(hidden_layer_size, activation=activation))(
            x)
    gated_layer = tf.keras.layers.TimeDistributed(
        tf.keras.layers.Dense(hidden_layer_size, activation='sigmoid'))(
            x)
  else:
    activation_layer = tf.keras.layers.Dense(
        hidden_layer_size, activation=activation)(
            x)
    gated_layer = tf.keras.layers.Dense(
        hidden_layer_size, activation='sigmoid')(
            x)

  return tf.keras.layers.Multiply()([activation_layer,
                                     gated_layer]), gated_layer

gate

주의 ! return 값이 2개 (gate output 값, (5)-1)

4) add_and_norm : applies skip connection followed by layer normalisation

  • x_list : a (original inputs) + gated_layer
def add_and_norm(x_list):
  """Applies skip connection followed by layer normalisation.
  Args:
    x_list: List of inputs to sum for skip connection
  Returns:
    Tensor output from layer.
  """
  tmp = Add()(x_list)
  tmp = LayerNorm()(tmp)
  return tmp

5) gated_residual_network : applies GRN as defined in paper

def gated_residual_network(x,
                           hidden_layer_size,
                           output_size=None,
                           dropout_rate=None,
                           use_time_distributed=True,
                           additional_context=None,
                           return_gate=False):
  """Applies the gated residual network (GRN) as defined in paper.
  Args:
    x: Network inputs
    hidden_layer_size: Internal state size
    output_size: Size of output layer
    dropout_rate: Dropout rate if dropout is applied
    use_time_distributed: Whether to apply network across time dimension
    additional_context: Additional context vector to use if relevant
    return_gate: Whether to return GLU gate for diagnostic purposes
  Returns:
    Tuple of tensors for: (GRN output, GLU gate)
  """

  # Setup skip connection
  if output_size is None:
    output_size = hidden_layer_size
    skip = x
  else:
    linear = Dense(output_size)
    if use_time_distributed:
      linear = tf.keras.layers.TimeDistributed(linear)
    skip = linear(x)

  # Apply feedforward network
  hidden = linear_layer(
      hidden_layer_size,
      activation=None,
      use_time_distributed=use_time_distributed)(
          x)
  if additional_context is not None:
    hidden = hidden + linear_layer(
        hidden_layer_size,
        activation=None,
        use_time_distributed=use_time_distributed,
        use_bias=False)(
            additional_context)
  hidden = tf.keras.layers.Activation('elu')(hidden)
  hidden = linear_layer(
      hidden_layer_size,
      activation=None,
      use_time_distributed=use_time_distributed)(
          hidden)

  gating_layer, gate = apply_gating_layer(
      hidden,
      output_size,
      dropout_rate=dropout_rate,
      use_time_distributed=use_time_distributed,
      activation=None)

  if return_gate:
    return add_and_norm([skip, gating_layer]), gate
  else:
    return add_and_norm([skip, gating_layer])

GRN

6) get_decoder_mask : returns casual mask to apply for self-attention layer

# Attention Components.
def get_decoder_mask(self_attn_inputs):
  """Returns causal mask to apply for self-attention layer.
  Args:
    self_attn_inputs: Inputs to self attention layer to determine mask shape
  """
  len_s = tf.shape(self_attn_inputs)[1]
  bs = tf.shape(self_attn_inputs)[:1]
  mask = K.cumsum(tf.eye(len_s, batch_shape=bs), 1)
  return mask
  • tf.shape : 텐서의 구조를 반환

  • tf.eye : 단위행렬

    batch_shape 을 지정하면 전체 shape 은 (batch_shape,단위행렬 shape)

    ex. c 라는 변수의 shape 이 4차원 (2,3,10,10) 이라고 하면

>>> X=tf.shape(c)[:1]
>>> Y=tf.shape(c)[1]
>>> Z=tf.eye(Y,batch_shape=X)
>>> Z
<tf.Tensor: shape=(2, 3, 3), dtype=float32, numpy=
array([[[1., 0., 0.],
        [0., 1., 0.],
        [0., 0., 1.]],

       [[1., 0., 0.],
        [0., 1., 0.],
        [0., 0., 1.]]], dtype=float32)>
  • K.cumsum : compute the cumulative sum (지정한 축이 기준)

    >>> K.cumsum(Z,1)
    <tf.Tensor: shape=(2, 3, 3), dtype=float32, numpy=
    array([[[1., 0., 0.],
            [1., 1., 0.],
            [1., 1., 1.]],
      
           [[1., 0., 0.],
            [1., 1., 0.],
            [1., 1., 1.]]], dtype=float32)>
    

7) ScaledDotProductAttention : Defines scaled dot product attention layer

속성 정의

  • self.dropout
  • self.activation
class ScaledDotProductAttention():
  """Defines scaled dot product attention layer.
  Attributes:
    dropout: Dropout rate to use
    activation: Normalisation function for scaled dot product attention (e.g.
      softmax by default)
  """

  def __init__(self, attn_dropout=0.0):
    self.dropout = Dropout(attn_dropout)
    self.activation = Activation('softmax')

attention 연산 부분

def __call__(self, q, k, v, mask):
    """Applies scaled dot product attention.
    Args:
      q: Queries
      k: Keys
      v: Values
      mask: Masking if required -- sets softmax to very large value
    Returns:
      Tuple of (layer outputs, attention weights)
    """
    temper = tf.sqrt(tf.cast(tf.shape(k)[-1], dtype='float32'))
    attn = Lambda(lambda x: K.batch_dot(x[0], x[1], axes=[2, 2]) / temper)(
        [q, k])  # shape=(batch, q, k)
    if mask is not None:
      mmask = Lambda(lambda x: (-1e+9) * (1. - K.cast(x, 'float32')))(
          mask)  # setting to infinity
      attn = Add()([attn, mmask])
    attn = self.activation(attn)
    attn = self.dropout(attn)
    output = Lambda(lambda x: K.batch_dot(x[0], x[1]))([attn, v])
    return output, attn
  • tf.cast : casts a tensor to a new type

  • tf.sqrt : 제곱근

  • K.batch_dot : batch 크기에 맞춰서 지정한 axes 끼리 dot-product 연산

    하 이건 진짜 겨우 이해했다. 나중에 따로 정리해야겠다 =_= 결국 einsum 이랑 거의 똑같이 작동한다.

8) InterpretableMultiHeadAttention : defines interpretable multi-head attention layer

  • stack vs concat

    stack vs concat

    전반적으로 텐서플로우 코드 구현했을 때와 거의 유사하다.

    class InterpretableMultiHeadAttention():
      """Defines interpretable multi-head attention layer.
      Attributes:
        n_head: Number of heads
        d_k: Key/query dimensionality per head
        d_v: Value dimensionality
        dropout: Dropout rate to apply
        qs_layers: List of queries across heads
        ks_layers: List of keys across heads
        vs_layers: List of values across heads
        attention: Scaled dot product attention layer
        w_o: Output weight matrix to project internal state to the original TFT
          state size
      """
      
      def __init__(self, n_head, d_model, dropout):
        """Initialises layer.
        Args:
          n_head: Number of heads
          d_model: TFT state dimensionality
          dropout: Dropout discard rate
        """
      
        self.n_head = n_head
        self.d_k = self.d_v = d_k = d_v = d_model // n_head
        self.dropout = dropout
      
        self.qs_layers = []
        self.ks_layers = []
        self.vs_layers = []
      
        # Use same value layer to facilitate interp
        vs_layer = Dense(d_v, use_bias=False)
      
        for _ in range(n_head):
          self.qs_layers.append(Dense(d_k, use_bias=False))
          self.ks_layers.append(Dense(d_k, use_bias=False))
          self.vs_layers.append(vs_layer)  # use same vs_layer
      
        self.attention = ScaledDotProductAttention()
        self.w_o = Dense(d_model, use_bias=False)
      
      def __call__(self, q, k, v, mask=None):
        """Applies interpretable multihead attention.
        Using T to denote the number of time steps fed into the transformer.
        Args:
          q: Query tensor of shape=(?, T, d_model)
          k: Key of shape=(?, T, d_model)
          v: Values of shape=(?, T, d_model)
          mask: Masking if required with shape=(?, T, T)
        Returns:
          Tuple of (layer outputs, attention weights)
        """
        n_head = self.n_head
      
        heads = []
        attns = []
        for i in range(n_head):
          qs = self.qs_layers[i](q)
          ks = self.ks_layers[i](k)
          vs = self.vs_layers[i](v)
          head, attn = self.attention(qs, ks, vs, mask)
      
          head_dropout = Dropout(self.dropout)(head)
          heads.append(head_dropout)
          attns.append(attn)
        head = K.stack(heads) if n_head > 1 else heads[0]
        attn = K.stack(attns)
      
        outputs = K.mean(head, axis=0) if n_head > 1 else head
        outputs = self.w_o(outputs)
        outputs = Dropout(self.dropout)(outputs)  # output dropout
      
        return outputs, attn
    

9) TFTDataCache : caches data for the TFT

class TFTDataCache(object):
  """Caches data for the TFT."""

  _data_cache = {}

  @classmethod
  def update(cls, data, key):
    """Updates cached data.
    Args:
      data: Source to update
      key: Key to dictionary location
    """
    cls._data_cache[key] = data

  @classmethod
  def get(cls, key):
    """Returns data stored at key location."""
    return cls._data_cache[key].copy()

  @classmethod
  def contains(cls, key):
    """Retuns boolean indicating whether key is present in cache."""

    return key in cls._data_cache

  • class method : 클래스 메서드 사용하기

    cls 로 클래스 속성에 접근 !

  • cache : 캐싱은 정보를 다시 계산하지 않고 저장했다가 알려주는 것 (우선은 이렇게만 이해했다)

    한마디로 데이터 캐싱을 하는 메소드인 것 같다

10) 드디어 ! 👋 TemporalFusionTransformer : defines Temporal Fusion Transformer

레이어 안에 너무 많은 메소드가 있어서 다 일일이 뜯어보지는 못하고 어떻게 돌아가는지 흐름 위주로 파악했다

  • get_tft_embeddings : Transforms raw inputs to embeddings
  • inner 메소드 (convert_real_to_embedding : Applies linear transformation for time-varying inputs)
  • _get_single_col_by_type: returns name of single column for input type
  • cache_batched_data : batches and caches data once for using during training
  • _batch_sampled_data : sample segments into a compatible format
  • _batch_data : batches data for training
  • _get_active_locations : formats sample weights for keras training
  • _build_base_graph : returns graph defining layers of the TFT
  • Static_combine_and mask : applies variable selection network to static inputs
  • lstm_combine_and_mask : apply temporal variable selection networks
  • get_lstm : returns lstm cell initialized with default parameters
  • build_model : build model and defines training losses
  • fit
  • evaluate
  • predict
  • format_outputs : returns formatted dataframes for prediction
  • get_attention : computes TFT attention weights for a given dataset

속성 정의

Attributes:
    name: Name of model
    time_steps: Total number of input time steps per forecast date (i.e. Width
      of Temporal fusion decoder N)
    input_size: Total number of inputs
    output_size: Total number of outputs
    category_counts: Number of categories per categorical variable
    n_multiprocessing_workers: Number of workers to use for parallel
      computations
    column_definition: List of tuples of (string, DataType, InputType) that
      define each column
    quantiles: Quantiles to forecast for TFT
    use_cudnn: Whether to use Keras CuDNNLSTM or standard LSTM layers
    hidden_layer_size: Internal state size of TFT
    dropout_rate: Dropout discard rate
    max_gradient_norm: Maximum norm for gradient clipping
    learning_rate: Initial learning rate of ADAM optimizer
    minibatch_size: Size of minibatches for training
    num_epochs: Maximum number of epochs for training
    early_stopping_patience: Maximum number of iterations of non-improvement
      before early stopping kicks in
    num_encoder_steps: Size of LSTM encoder -- i.e. number of past time steps
      before forecast date to use
    num_stacks: Number of self-attention layers to apply (default is 1 for basic
      TFT)
    num_heads: Number of heads for interpretable mulit-head attention
    model: Keras model for TFT
  """
 def __init__(self, raw_params, use_cudnn=False):
    """Builds TFT from parameters.
    Args:
      raw_params: Parameters to define TFT
      use_cudnn: Whether to use CUDNN GPU optimised LSTM
    """

    self.name = self.__class__.__name__

    params = dict(raw_params)  # copy locally

    # Data parameters
    self.time_steps = int(params['total_time_steps'])
    self.input_size = int(params['input_size'])
    self.output_size = int(params['output_size'])
    self.category_counts = json.loads(str(params['category_counts']))
    self.n_multiprocessing_workers = int(params['multiprocessing_workers'])

    # Relevant indices for TFT
    self._input_obs_loc = json.loads(str(params['input_obs_loc']))
    self._static_input_loc = json.loads(str(params['static_input_loc']))
    self._known_regular_input_idx = json.loads(
        str(params['known_regular_inputs']))
    self._known_categorical_input_idx = json.loads(
        str(params['known_categorical_inputs']))

    self.column_definition = params['column_definition']

    # Network params
    self.quantiles = [0.1, 0.5, 0.9]
    self.use_cudnn = use_cudnn  # Whether to use GPU optimised LSTM
    self.hidden_layer_size = int(params['hidden_layer_size'])
    self.dropout_rate = float(params['dropout_rate'])
    self.max_gradient_norm = float(params['max_gradient_norm'])
    self.learning_rate = float(params['learning_rate'])
    self.minibatch_size = int(params['minibatch_size'])
    self.num_epochs = int(params['num_epochs'])
    self.early_stopping_patience = int(params['early_stopping_patience'])

    self.num_encoder_steps = int(params['num_encoder_steps'])
    self.num_stacks = int(params['stack_size'])
    self.num_heads = int(params['num_heads'])

    # Serialisation options
    self._temp_folder = os.path.join(params['model_folder'], 'tmp')
    self.reset_temp_folder()

    # Extra components to store Tensorflow nodes for attention computations
    self._input_placeholder = None
    self._attention_components = None
    self._prediction_parts = None

    print('*** {} params ***'.format(self.name))
    for k in params:
      print('# {} = {}'.format(k, params[k]))

    # Build model
    self.model = self.build_model()

hidden_layer_size 는 lstm 인코더 디코더에 들어갈 모델 차원 수이고, 따라서 임베딩 시켜줄 때도 이에 맞춰주는 것을 알 수 있다.

_input_obs_locid, time 을 뺀 전체 칼럼에서 타겟의 인덱스이다.

ex. 여기 칼럼에서 _input_obs_loc 은 [0]

id hours_from_start power_usage hour day_of_week hours_from_start categorical_id
실수형 실수형 실수형 실수형 실수형 실수형 카테고리형
ID 시간(t) y값 (target) 기본 값 기본 값 기본 값 메타데이터

_static_input_locid,time 을 뺀 전체 칼럼에서 메타 데이터의 인덱스이다.

ex. 여기 위 칼럼에서 _static_input_loc 은 [4]

_known_regular_input_idxid,time을 뺀 전체 칼럼에서 / 실수형 데이터 중 / 인풋 타입이 기본 값 이나 메타 데이터 인 데이터의 인덱스이다.

ex. 여기 위 칼럼에서 _known_regular_input_idx 는 [1,2,3]

_known_categorical_input_idxid,time을 뺀 전체 칼럼에서 / 범주형 데이터 중 / 인풋 타입이 기본 값이나 메티 데이터인 데이터의 인덱스이다.

ex. 여기 위 칼럼에서 _known_categorical_input_idx 는 [0]

time_steps 는 전체 인풋의 width : electricity 데이터라면 24 * 7 + 24 * 1 = 24 * 8

input_size 는 id,time 을 뺀 전체 칼럼 갯수 ex. 여기 위 칼럼에서라면 [5]

output_size 는 target 의 총 개수

임베딩 : get_tft_embeddings , (convert_real_to_embedding)

return unknown_inputs, known_combined_layer, obs_inputs, static_inputs

제일 마지막 리턴 값이 총 4개이다.

Remind) TFT 에서 인풋 타입

  • y 값 (target)

  • 측정 값
  • 측정 X, 기본 값
  • 메타 데이터
  • ID
  • 시간 (t)

Unknown_inputs : 임베딩 된 실수형/측정 값 + 카테고리형/측정 값

Known_inputs : 임베딩 된 실수형/ 기본 값 + 카테고리형/기본 값

obs_inputs : 임베딩 된 타겟

Static_inputs : 임베딩 된 메타데이터

cache_batched_data, _batch_sampled_data, _batch_data

원 training, validation 데이터는 _batch_data 를 통해 배치에 담긴다.

"""Batches data for training.
    Converts raw dataframe from a 2-D tabular format to a batched 3-D array
    to feed into Keras model.
    Args:
      data: DataFrame to batch
    Returns:
      Batched Numpy array with shape=(?, self.time_steps, self.input_size)
    """

스크린샷 2021-04-06 오후 1 13 33

sampled_data = {
        'inputs': inputs,
        'outputs': outputs[:, self.num_encoder_steps:, :],
        'active_entries': np.ones_like(outputs[:, self.num_encoder_steps:, :]),
        'time': time,
        'identifier': identifiers
    }

    return sampled_data

_batch_sampled_data 에서 데이터가 해당 포맷에 맞게 변형된 후 리턴된다.

( 물론 _batch_data 에서도 해당 포맷 그대로 데이터가 배치에 담긴다. )

 if num_samples > 0:
      TFTDataCache.update(
          self._batch_sampled_data(data, max_samples=num_samples), cache_key)
    else:
      TFTDataCache.update(self._batch_data(data), cache_key)

    print('Cached data "{}" updated'.format(cache_key))

그리고 cache_batched_data 에서 _batch_sampled_data를 통해 호환가능한 포맷으로 바뀐 데이터를 캐싱한다.

즉, 현재 사용할 수 있는 train_df 가 없는 경우 캐싱된 것을 가져오고, 있는 경우 _batch_data 메소드를 통해 맞는 형식으로 바꿔서 배치에 담는다.

if train_df is None:
      print('Using cached training data')
      train_data = TFTDataCache.get('train')
    else:
      train_data = self._batch_data(train_df)

그 외의 중요 모델 layers 는 앞서서 사전에 정의해준 layer utility functions 을 활용한 것으로 보인다.