nokoのブログ

こちらは暫定のメモ置き場ですので悪しからず

データエンジニアリング実践

はじめに

  • データエンジニアリングまわりを復習したときのメモです。
  • Webアプリから発生するデータを使ってモデルを継続的に学習させるためのデータ基盤を想定しています。

データエンジニアリング導入以前ver

概要

  • CSVファイル
    • PoC時点などのシンプルな実装だと、サーバ腹持ちCSVファイルのみでデータを管理する、ということもあると思います
  • リレーショナルデータベース(RDB
    • 同時実行制御・耐障害性
    • 一貫性・拡張性・信頼性・速度

関連技術要素

課題

  • Webアプリケーションデータベースとは疎にしたい(分ければ良いが)
  • 性能が出ない
    • 統合データ環境として作られていると、分析用途のクエリは性能が出にくい(アドホックな分析を想定して設計されていない)
  • インフラコストが高い
    • 利用するデータを格納する→事前に必要になりそうなデータをとにかく集めておく

データウェアハウス導入ver

概要

  • データウェアハウス
    • ELTがメイン
    • SQL/非構造化データを扱わないなら
    • 集約することで、データのサイロ化を防ぐ(Dx条件の一つ)
    • 設計のポイント
      • データ保存量の拡張手段(制約がないか)
      • データの取り出し手段

関連技術要素

  • 【DWH】BigQuery(GCP
    • コスト
      • マルチテナント方式であるため、事前のプロビジョニングのコストが発生しない
        • コンピュートまたはストレージのどちらか大きいほうのピークに合わせてサイジング調整する必要などがない
      • クエリでスキャンした容量に対して課金
        • カラム指向ストレージなので、LIMITは意味なし
        • デフォルトで作られるキャッシュテーブル(24h)は、スキャン料金の対象にならない
    • データ取り込み
      • バルクロード: BigQuery Data Transfer Service, など
      • ストリーミング挿入: Fluentd, Dataflowのコネクタ, StriimなどのCDCソリューション(MySQLなどと同期)
      • フェデレーション: 外部テーブルを指定
    • データマート用途での利用
      • クエリキャッシュ、データマートテーブル作成、ビュー、マテリアライズドビュー
      • INSERTはいいが変更DMLは苦手なので、中間テーブルを次々にCTASで作成するのが良い(テーブル有効期限も合わせて設定)
      • データマートも兼ねたり。中間データの作成など。
    • 1レコード500バイトと仮定すると、1千万レコードで5GBで、それくらいならRDBをデータマートにしても良い?
    • cdc

課題

  • 非構造化データを扱うことが増えてきている
    • 顧客接点のデジタル化
    • 「2025年までに、全世界のデータのうち80%が非構造化データを占めるようになる」

データレイク導入ver

構成イメージ

スクリーンショット 2021-03-14 11.56.15.png

概要

  • データレイク
    • 未加工のデータを収集して蓄積するための場所
    • 「データを中心としたアーキテクチャとなっており、データのサイロが最小化され、スケーラブルな分散環境でデータの処理が互いに干渉し合うことなく実行される場所」
  • データマート
  • ストリーミング処理
  • ETL
    • 1ファイルあたり128MB-512MBが良い
    • パーティション含めたデータの持ち方の設計が大事
  • ワークフロー管理
    • 処理の依存関係を制御、処理のタイミングを適切にコントロールする
  • メタデータ管理
    • データリネージ
      • あるデータがどのデータからどういう過程を経て作られたか

関連技術要素

  • 【データレイク】S3
    • オブジェクトストレージに格納するデータは、1MB~1GBくらいが良い
  • 【ストリーミング処理】Fluentd
  • 【ストリーミング処理】Kafka
  • 【ストリーミング処理】Pub/Sub(GCP
    • データ量の急なバーストに耐え得るスケーラビリティ
  • 【ストリーミング処理】Dataflow
    • Apache Beam
    • リアルタイムに整形・集計などのデータ処理をするパイプラインを構築する
    • 外部サービスとのI/Oコネクタが標準で提供されている
    • Pub/Subで収集→Dataflowで処理→BQで蓄積、など
    • S3に書き出すこともできる
    • DataflowSQLもある
    • ノートブックもある
    • SpotifyはリアルタイムレコメンドにApacheBeamを利用
  • 【ストリーミング処理】Kinesis Data Firehose
  • 【ETL】DataProc(GCP
    • マネージドHadoop/Spark
    • ファイルパスの接頭辞をhdfs://からgs://へ変更するだけでGCSにアクセスすることができる
    • 90sで起動
    • メモリを追加してPresto, GPUを追加してSparkで機械学習もできる
    • DataProcHubでJupyterNotebookも利用可能
    • DataProcでGCS上にHiveパーティションデータを作る→それをBQで直接触る、などもできる
    • BQのストレージをDataProcから直接触ることもできる(BigQueryStorageAPIの利用)
  • 【ETL】Glue(AWS
    • DataCatalogとETLという2つのコンポーネントから構成されるサービス
    • 「S3のデータファイルから、AWS Redshift Spectrum と AWS Athenaのテーブルを作成する」ツール
      • Glueでデータカタログと呼んでいるテーブル定義は、Apache Hive のTableのことで、 Glueは、S3のファイルから、Hive Tableを作るツール
    • Glueはクローリングによるテーブル定義作成・更新に加えて、Apache Sparkを使って、プログラミングにより、ユーザーがより細かくデータ加工することもできます。
    • AWS GlueのジョブにはSparkとPython Shellの2つのジョブタイプがあります。
    • データの圧縮やParquet変換
    • マスキングやパーティション
  • 【ETL】Athena(AWS
    • Presto
      • Hiveと違って、速い分、エラーが起きると最初からやり直す(Hiveは大規模なバッチ処理を着実に実行することに長けたクエリエンジン)
        • データの構造化は、HiveやSparkで実行する
    • S3以外もデータソースにできる
    • 読み込むサイズで料金が変わる→gzipするだけでも安くなる
  • 【ワークフロー管理】CloudComposer
    • Airflowに必要なコンポーネントが自動で起動する
      • GKE
      • GCS
      • CloudLogging
      • CloudMonitoring
      • CloudSQL(バックエンドDB)
  • メタデータ管理】DataCatalog(GCP
    • GCSのデータやBQのデータを横断的に検索・管理することができる

その他メモ

参考

Kaggleコンペ開始直後にやっていること

はじめに

  • はじめてKaggleのコンペに参加してみたので、初動でやったことの大まかな流れを備忘までに残しておきます。

やったこと

0. 前提

  • Kaggleアカウントは作成しておくこと

1. コンペ参加登録

  • Kaggleページ→compete→sample_compete(対象コンペ)→Join Competition

2. GitHubリポジトリ作成

3. Issue(初期)作成

  • フォーマット
    • タイトル
    • 完了要件
    • 実施内容
  • Issueタイトル
      1. 環境構築
      1. サンプル公開ノートブック確認
      2. Input/Outputデータ確認
      3. 処理フロー確認
      1. EDA
      1. サーベイ
      1. 実験計画立て・実験管理シート作成
  • (補足)ノートブックコンペの場合は、スプレッドシート作成(「Issue」シート)

4. 環境構築

  • (補足)ノートブックコンペの場合は対応不要

4-1. Docker環境構築

4-2. Kaggle API認証設定

  • kaggleのpipパッケージインストール
    • → requirements.txtに kaggle を追記してコンテナを再作成する
    • (暫定) pip install kaggle
  • kaggle認証
    • 事前にkeyを確認しておく
    • ホスト側でexportしておく
    • → コンテナ起動時にそれを引き継ぐようにrun.shを修正する
# ホスト側で実行(bashrcなどに設定しておく)
$ export KAGGLE_USERNAME=kaggle_username
$ export KAGGLE_KEY=xxxxxxxxxxxxxx
...
-e KAGGLE_USERNAME=${KAGGLE_USERNAME} \
-e KAGGLE_KEY=${KAGGLE_KEY} \
...
  • Kaggle API 動作確認
$ kaggle competitions list

4-3. Readme作成

  • README作成
    • 概要
      • コンペのリンク+α
    • 環境構築手順
      • 上記の環境構築手順(KaggleAPI用環境変数設定、コンテナ起動)
    • スクリプト実行手順
    • Input/Outputデータサマリ
    • 実験結果サマリ
  • → git pull requestまで

5. サンプル公開ノートブック確認

  • Input/Outputデータを確認する
    • Readmeに追記する
      • Input/Outputデータ
        • データ種
        • データ量
        • カラム
  • 処理フローを確認する
    • 前処理→学習→推論
    • スクリプト(ノートブック)・関数の切り方

6. EDA

  • このタイミングで、Inputデータ取得・読み込み周りを実装する
  • 手法
    • pandas profiling など
    • head(), info(), shape, unique, display image, wordcloud, basic nlp -> count, distributed

7. サーベイ

  • 公開ノートブック・関連論文を調査する
  • →試し得る手法の洗い出し、優先順位付けをする

8. 実験計画立て・実験管理シート作成

  • EDAサーベイの結果を元に、実験計画を立てる
  • 予実管理用の実験管理シートを作成する
    • No, タイトル, 関連IssueNo, スクリプト, 変更元, 変更内容, 補足, 結果

9. 個別Issue起票・対応

  • 実験計画に従い、Issueを起票し、合わせてブランチを作成して実験していく
  • ブランチ切り替え(別Issueに取り組む度に実施する)
# mainブランチを最新にする
$ git checkout main
$ git status
$ git pull origin main
# → 上記同様、新たなブランチを作成する

バックアップリストア改廃を復習してみた

はじめに

  • バックアップリストア改廃を復習してみたときのメモです
  • 5W1Hを意識して漏れなく設計します

一覧

  • 取得元
  • バックアップ
    • 方式
    • トリガ
    • タイミング
  • リストア
    • 方式
  • 改廃(取得元)
    • 方式
    • トリガ
    • ローカル保存世代数
  • 改廃(バックアップ先)
    • 方式
    • トリガ
    • タイミング
    • ローカル保存世代数

データエンジニアリングを復習してみた

概要

スクリーンショット 2021-03-14 11.56.15.png

選定ポイント

  • MPPデータベースは、最初にETLプロセスなどでデータを取り込むための手順が必要
    • 元のデータがCSVJSONで、複雑な加工処理が不要なら、オブジェクトストレージからデータウェアハウスに直接転送してSQLで加工、でも良い
    • BIツールとMPPデータの組み合わせは実績があるので、可視化のためのデータマートとして使える
  • Hiveは大規模なバッチ処理を着実に実行することに長けたクエリエンジン
  • PrestoはHiveとは逆で、速い分、エラーが起きると最初からやり直す
    • データの構造化は、HiveやSpark
  • 1レコード500バイトと仮定すると、1千万レコードで5GBで、それくらいならRDBをデータマートにするのが良い

補足

  • ポイントは
    • データ保存量の拡張手段(制約がないか)
    • データの取り出し手段
  • 分散ストレージとしてNoSQLを使うときもある
  • オブジェクトストレージに格納するデータは、1MB~1GBくらいが良い
  • データフローは、分散ストレージに格納した後から
    • オブジェクトストレージなどからデータ読み込んで、構造化されたテーブルを作成する
    • ETLプロセスの一種
  • Glue
    • 「S3のデータファイルから、AWS Redshift Spectrum と AWS Athenaのテーブルを作成する」ツール
      • 実質、AWS GlueはAWS EMRで作られていて、テーブル定義はAWS EMRのHive Tableでも作ることができます。
    • Glueでデータカタログと呼んでいるテーブル定義は、Apache Hive のTableのことで、 Glueは、S3のファイルから、Hive Tableを作るツール
    • Glueはクローリングによるテーブル定義作成・更新に加えて、Apache Sparkを使って、プログラミングにより、ユーザーがより細かくデータ加工することもできます。
    • AWS GlueのジョブにはSparkとPython Shellの2つのジョブタイプがあります。
      • Sparkジョブではutf-8形式のデータのみにしか対応していない
  • Athena
    • Athena は初期状態で AWS Glueデータカタログと統合されており、さまざまなサービスにわたるメタデータの統合リポジトリを作成できます。データソースのクロールとスキーマの解析、新規および修正したテーブル定義とパーティション定義のカタログへの入力、スキーマのバージョニング保持が可能です。
  • データマートは集約と可視化の間に挟む
  • Hive metastoreは内部でPostgresを使ったりする

参考

MLモデル評価で考えること

はじめに

  • MLモデル評価のまとめ方メモです

メモ

  • 全体を通して、「つまり、現在実践で使えるレベルか?」「課題はどこで、クリアできそうか?それがクリアできたらどれくらい良くなりそうか?」が伝わるようにする
  • 網羅感大事

1. 検証計画再掲

  • スコープ
  • 検証パターン一覧
  • スケジュール、など

2. 定量評価まとめ

  • 前提条件
    • 学習データ、テストデータ
    • 評価方法
  • 混同行列
    • プロジェクトの性質に応じてprecisionとrecallを使い分ける
    • モデルの評価にはF値やIoUを使う
  • 課題の分類は、精度×データ量のマトリクスなど
    • そもそものカテゴリ分類でも

3. 定性評価まとめ

  • 良い例も載せる
  • 課題を書くなら、対応策もセットで書く

4. 考察、ネクストアクション

  • 推論結果の閾値は、妥当な指標が一番高くなるところにするか、あるタグの頻度の妥当性で決めるか
  • 課題を一覧化する。それぞれが、どれくらいの重要度か整理する。(どれくらい精度が上がりそうか)

Macの初期セットアップ手順メモ(vivaldi, vscode, xonsh, etc)(2021)

はじめに

  • Macを初期セットアップしたときのメモ。

キーバインド設定

  • システム環境設定 > キーボード > caps lock -> command(キーボードごとに)
  • システム環境設定 > キーボード > ショートカット > 入力ソースの変換(英数かな文字変換) command + space (Spotlightは ctrl + space に)
  • 辞書登録

デスクトップ設定

Docks設定

Vivaldi

Chrome

Clipy

  • command × 2
  • command + ctrl + R
  • command + ctrl + S

Alfred

Line

Zoom

Slack

Boost Note(EverNote

ToDoist

iTerm2

  • カラー
    • Tango Dark
  • 文字サイズ
    • 16
  • ホットキー設定+デスクトップ上に被せてフルスクリーンで表示
    • iTerm2 > Preferences > General > Window > Native full screen windowsのチェックを外す
    • iTerm2 > Preferences > Profiles > Window > Settings for New Windows > Style, Screen, Space
    • iTerm2 > Preferences > Keys > HotKey > Show/hide iTerm2 with a system-wide hotkeyにチェック
    • command 2回をホットキー設定(Profilesの方から)
    • pin hotkey

ssh設定

  • .ssh/config設定
Host own-dev-app01
    HostName 3.14.15.92
    Port 22
    User ec2-user
    IdentityFile ~/.ssh/own-dev-key.pem
    IdentitiesOnly yes
    TCPKeepAlive yes

vim

# 事前にbrewのインストール
# echo 'export PATH="/opt/homebrew/bin:$PATH"' >> ~/.zshrc
brew install vim
~/.vimrc
"----------------------------------------
" setting
"----------------------------------------
"文字コードをUFT-8に設定
set fenc=utf-8
" バックアップファイルを作らない
set nobackup
" スワップファイルを作らない
set noswapfile
" 編集中のファイルが変更されたら自動で読み直す
set autoread
" バッファが編集中でもその他のファイルを開けるように
set hidden
" 入力中のコマンドをステータスに表示する
set showcmd

"----------------------------------------
" 見た目系
"----------------------------------------
" 行番号を表示
set number
" タイトルを表示
set title
" 現在の行を強調表示
set cursorline
" 行末の1文字先までカーソルを移動できるように
set virtualedit=onemore
" 括弧入力時の対応する括弧を表示
set showmatch
" ステータスラインを常に表示
set laststatus=2
" コマンドラインの補完
set wildmode=list:longest
" 折り返し時に表示行単位での移動できるようにする
nnoremap j gj
nnoremap k gk
" シンタックスハイライトの有効化
syntax enable
" コメントの色を水色
hi Comment ctermfg=3

"----------------------------------------
" Tab系
"----------------------------------------
" 不可視文字を可視化(タブが「▸-」と表示される)
set list listchars=tab:\▸\-
" Tab文字を半角スペースにする
set expandtab
" 行頭以外のTab文字の表示幅(スペースいくつ分)
set tabstop=4
" 行頭でのTab文字の表示幅
set shiftwidth=4

"----------------------------------------
" 検索系
"----------------------------------------
" 検索文字列が小文字の場合は大文字小文字を区別なく検索する
set ignorecase
" 検索文字列に大文字が含まれている場合は区別して検索する
set smartcase
" 検索文字列入力時に順次対象文字列にヒットさせる
set incsearch
" 検索時に最後まで行ったら最初に戻る
set wrapscan
" 検索語をハイライト表示
set hlsearch

Python3

$ brew install pyenv 
$ pyenv install 3.9.2 
$ pyenv global 3.9.2 
$ mkdir -p ~/opt/python_env 
$ cd python_env; pwd 
# 上記でセットアップしたpythonを利用
# /Users/<USER>/.pyenv/versions/3.9.2/bin/python -V
$ python -m venv py392env 
$ source /Users/username/opt/python_env/py392env/bin/activate 
$ pip install numpy 
$ pip install Pillow 
$ pip install jupyter 
$ jupyter notebook --notebook-dir=~/pj/ & 
# Tabnineのインストール

xonsh

  • xonshインストール
# python仮想環境activate後に実施
$ pip install xonsh
$ pip install gnureadline

$ brew install bash-completion2
$ brew install peco

$ which xonsh
->iTerm各プロファイルのCommandを上記パスに修正
  • xonshrcの設定
# python
# source-bash source /Users/username/opt/python_env/py364env/bin/activate

# path
$PATH.append("/usr/local/bin/")

# prompt
$PROMPT = "{INTENSE_RED}{user}{INTENSE_GREEN}@{INTENSE_BLUE}{hostname}{INTENSE_YELLOW} [ {cwd} ] {GREEN}$ "

# tab
$INDENT = "    "

# completion
$COMPLETIONS_CONFIRM = True

# ls
$LS_COLORS="di=34:ln=35:so=32:pi=33:ex=31:bd=46;34:cd=43;34:su=41;30:sg=46;30:tw=42;30:ow=43;30"

# alias
aliases["lt"] = "ls -ltr"
aliases["la"] = "ls -la"
aliases["ll"] = "ls -l"

# complement command and ssh
import os
import json
from collections import OrderedDict
from operator import itemgetter
from prompt_toolkit.keys import Keys
from prompt_toolkit.filters import (Condition, IsMultiline, HasSelection, ViInsertMode)
from prompt_toolkit import print_formatted_text
from prompt_toolkit.styles import Style

@events.on_ptk_create
def custom_keybindings(bindings, **kw):
    # ctrl+rで過去の実行コマンドをpeco
    @bindings.add('c-r')
    def select_history(event):
        sess_history = $(history).split('\n')
        hist = _get_history(sess_history)
        selected = $(echo @(hist) | peco)
        event.current_buffer.insert_text(selected.strip())

    # ctrl+sでssh_config内のhost先をpeco
    @bindings.add('c-s')
    def select_ssh(event):
        hosts = _get_host(True)
        selected = $(echo @(hosts) | peco)
        if selected:
            event.current_buffer.insert_text('ssh ' + selected.strip().split(', ')[0])

    # ctrl+fで今いるディレクトリのfileをpeco
    @bindings.add('c-f')
    def select_file(event):
        r = lambda x: './'+x if os.path.isdir(x) else x
        files = '\n'.join([r(x.split(' ')[-1]) for x in $(ls -l).split('\n')])
        selected = $(echo @(files) | peco)
        event.current_buffer.insert_text(selected.strip())

inquirer_style = Style.from_dict({
    'qa': '#5F819D',
    'qu': '#FF9D00',
    'dp': '#000'
})

def _get_history(session_history=None, return_list=False):
    hist_dir = __xonsh__.env['XONSH_DATA_DIR']
    files = [ os.path.join(hist_dir,f) for f in os.listdir(hist_dir)
              if f.startswith('xonsh-') and f.endswith('.json') ]
    file_hist = []
    for f in files:
        try:
            file_hist.append(json.load(open(f))['data']['cmds'])
        except:
            pass
    cmds = [ ( c['inp'].replace('\n', ''), c['ts'][0] )
                 for cmds in file_hist for c in cmds if c]
    cmds.sort(key=itemgetter(1))
    cmds = [ c[0] for c in cmds[::-1] ]
    if session_history:
        cmds.extend(session_history)
    # dedupe
    zip_with_dummy = list(zip(cmds, [0] * len(cmds)))[::-1]
    cmds = list(OrderedDict(zip_with_dummy).keys())[::-1]
    cmds = reversed(cmds)
    if return_list:
        return cmds
    else:
        return '\n'.join(cmds)

def _get_host(color=False):
    all_text = ''
    text = ''
    for x in $(cat ~/.ssh/config).split('\n'):
        if 'LocalForward' in x:
            text += ', ' + x.strip().split(' ')[1]
        if 'HostName' in x:
            text += ', ' + x.strip().split(' ')[1]
        elif 'Host ' in x:
            if text!='':
                all_text += text + '\n'
            text = x.split(' ')[1]
    all_text += text + '\n'
    if not color:
        all_d = []
        for x in all_text.split('\n'):
            for i,y in enumerate(x.split(', ')):
                if i==0:
                    all_d.append(('class:qu', y))
                if i==1:
                    all_d.append(('', ', '))
                    all_d.append(('class:qa', y))
                    if len(x.split(', '))==2:
                        all_d.append(('','\n'))
                if i==2:
                    all_d.append(('', ', '))
                    all_d.append(('class:qp', y))
                    all_d.append(('','\n'))
        print_formatted_text(FormattedText(all_d),
                style=inquirer_style)
        return
    return all_text

aliases['host']=_get_host

Docker

VS Code

  • font 16
  • Color theme Solalized Dark-
  • [表示]  > [コマンドパレット] > install coと入力 > シェルコマンドを・・・を選択 : code <ファイル名>で開けるように設定
  • venvpathに上記でインストールしたvenvのパスを記載
  • [表示]  > [コマンドパレット] > select intと入力 > インタプリターを・・・を選択 : インタプリタを設定
  • 拡張機能

    • Vim
    • vscode-icons-mac
    • trailing spaces
    • Bracket Pair Colorizer
    • Rainbow CSV
    • Output Colorizer
    • Indenticator
    • indent-rainbow
    • Python
    • Tabnine
    • YAML
    • Remote Development
    • Terraform
    • Docker
    • Markdown All in One
  • キーボードショートカット

// Place your key bindings in this file to overwrite the defaults
[
{
    "key": "cmd+t",
    "command": "workbench.action.files.newUntitledFile" },
{
    "key": "cmd+q",
    "command": "workbench.action.closeActiveEditor"
},
{
    "key": "cmd+h",
    "command": "editor.action.startFindReplaceAction"
},
]

おわりに

  • とりあえず上記で始めて、使いながら追加していこうと思います。

fluentdとKinesisDataFirehostとlogrotateでログの収集とローテをする

はじめに

  • fluentd(EC2→CloudWatchLogs) + Kinesis Data Firehose(EC2→CloudWatchLogs → S3) でログの収集
  • logrotateでログのローテ・改廃(EC2)

fluentdでログの収集

fluentdの仕組み

  • 1つのメッセージは、[tag, time, record]で構成される
  • 流れ
    • Inputプラグイン: 各種データソースからのログにタグを付けて収集する
    • Filterプラグイン: フィルタ(データの加工)を行う
    • Outputプラグイン: 各種データ保存先へ出力する
  • ディレクトリ構成
    • /etc/td-agent/td-agent.conf
    • /var/log/td-agent/
  • ディレクティブ
      • ディレクティブで指定したプラグインを経由してログ収集を始める
      • Inputプラグインを指定
        • tail が多い
          • pos_fileオプションの利用を推奨(再起動しても続きから)
          • formatが特になければnone
      • @type: プラグイン
      • tag: 出力タグ
      • 指定したタグパターンに該当するディレクティブで、タグの書き換えや外部へのデータ出力を行う
      • ログの出力先を決めるディレクティブ
      • @type: プラグイン
        • cloudwatch_logs
          • log_stream_nameをつける代わりに use_tag_as_stream true でも良い

fluentdの設定例

  • 下記のようにS3に直接送る場合もあるが、他サービスと合わせてCloudWatchLogsからS3に送る場合もある
<source>
  @type tail
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd.access.log.pos
  format apache
  tag td.apache.access
</source>

<match td.apache.**>
  @type cloudwatch_logs
  region ap-northeast-1
  auto_create_stream true
  log_stream_name ${hostname}
  log_group_name ${tag}
  retention_in_days 8
  flush_at_shutdown true
</match>

<match td.apache.**>
  @type s3
  s3_region ap-northeast-1
  s3_enpoint s3-ap-northeast-1.amazonaws.com
  s3_bucket s3-bucket-log
  path ec2/
  time_slice_format ${hostname}/${tag}/${tag}-%Y%m%d%H
  flush_at_shutdown true
</match>
  • (参考)Slack通知バージョン
<source>
  @type tail
</source>

<match td.syslog.**>
  @copy
  <store>
    @type cloudwatch_logs
  </store>

  <store>
    @type s3
  </store>

  <store>
    @type grepcounter
  </store>
</match>

<filter slack.td.syslog.**>
  @type record_transformer
</filter>

...

<match slack.**>
  @type slack
</match>

Kinesis Data Firehoseでログの転送

Kinesis Data Firehoseの仕組み

  • CloudWatchLogsからS3へLambdaでの定期実行をしてはいけない理由
    • Export taskは1アカウント/regionで同時に一つしか実行できない
    • 対象となるデータにはタイムラグがあり、ニアリアルタイムのExportができない
    • 古いログの欠落に繋がる。久しぶりに起動したEC2から古いタイムスタンプのログが送られてきたり
  • 主な構成要素
    • 配信ストリーム(Delivery Stream)
      • データ配信の単位
    • データプロデューサー
      • Kinesis Data Firehose へのデータの送信元
  • Subscription Filterとは

Kinesis Data Firehoseの設定例

  • Kinesis Data Firehose のIAMロールを作成する
  • Kinesis Data Firehose の配信ストリームを作成する
    • データ変換(Lambdaをかます)もできる
    • 配信先を指定(S3など)
    • 上記で作成したロールを指定
  • CloudWatch Logs のIAMロールを作成する
    • CloudWatch Logs が Kinesis Data Firehose にデータを送信するためのIAMロールを作成
  • CloudWatch Logs から Kinesis Data Firehose にデータを送信するためのサブスクリプションフィルタを作成する
    • log-group-name
    • filter-pattern(今回は指定なし)
    • destination-arn
    • role-arn
  • → 概要としては、 kinesis firehose delivery stream とそのIAMロールを作る + CloudWatchの subscription filter とそのIAMロールを作る

logrotateでログのローテ・改廃

logrotateの仕組み

  • /etc/logrotate.confに全ての設定を記載することも可能だが、/etc/logrotate.d以下もincludeされているので、ここにサービスごとの設定ファイルを作成し、記載する。
  • dry run
    • logrotate -dv /etc/logrotate.conf

logrotateの設定例

/var/log/sample/access.log { # 対象のログファイル
    ifempty                  # ログファイルが空でもローテーションする
    daily                    # 毎日ローテートする
    # dataext                # dailyと組み合わせ、元のファイル名 + -YYYYMMDD のファイル名で生成
    dateformat .%Y%m%d       # dateフォーマットを任意のものに変更する
    missingok                # ログファイルがなくてもエラーを出さない
    compress                 # 圧縮する
    delaycompress            # ログの圧縮作業を次回のローテーション時まで遅らせる。compressと共に指定
    rotate 10                # 10世代分古いログを残す
    # create 0644 fuga fuga  # ローテーションを行った後、代わりに空の新規ログファイルを作る。権限・グループ・ユーザを指定可能
    postrotate               # ローテート後にsyslogを再起動
        /bin/kill -HUP `cat /var/run/syslogd.pid 2> /dev/null` 2> /dev/null || true
    endscript
}

参考