タグ

ブックマーク / qiita.com/hakobera (6)

  • Ruby と tumugi によるデータパイプライン構築 - Qiita

    データを扱う仕事をしていると、何らかの形でデータをある順序に沿って処理する Workflow = データパイプラインを構築する必要になる場面に出くわすことが多いと思います。 こうした処理に適用できるソフトウェアは既にいくつか存在するのですが、自分が仕事で使う上でマッチするものがなかったので、tumugi を Ruby のライブラリを作成し、 gem として公開しました。ロゴはこんな感じ。 Workflow Engine をつくろう! シリーズ連載は、元々このライブラリの設計を整理するために書いていた記事でした。まだ、未完ですが興味のある方はそちらもお読みください。 注釈 この記事は tumugi が Python で書かれたワークフローエンジンである Luigi に強く影響されて書いたことから、対比として分かりやすいため Building Data Pilelines with Pytho

    Ruby と tumugi によるデータパイプライン構築 - Qiita
  • tumugi で BigQuery のクエリ結果を Google Sheets に保存し、URL を Slack に通知する方法 - Qiita

    tumugi で BigQuery のクエリ結果を Google Sheets に保存し、URL を Slack に通知する方法RubyGoogleDriveBigQuerytumugi 仕事で BigQuery のクエリの結果を共有用のフォルダに Google Sheets 形式で保存して、Slack で URL を共有する、ということを頻繁にやっているのですが、 WebUI だと、クエリ結果を Google Sheets 形式で保存できるけど、ファイル名とフォルダが指定できないので、いちいち名前を変えて、フォルダを移動しないといけない。Slack 通知はできない。 Google Apps Scirpt (GAS) だと、全部実現できるけど、コードの管理を GitHub でやるのが面倒 と定形処理なのに、イマイチ効率の良い方法がなかったのを、tumugi で Ruby スクリプト化しまし

    tumugi で BigQuery のクエリ結果を Google Sheets に保存し、URL を Slack に通知する方法 - Qiita
  • Workflow Engine をつくろう! Part 1(Task の依存関係の解決) - Qiita

    Part 1 Task の依存関係の解決 Part 2 Workflow の冪等性 Part 3 Task 間でのデータのやり取り Part 4 Task の並列実行 Workflow Engine って何? Workflow Engine と言っても多機能なものから、シンプルなものまで様々なものがあります。そこで、主旨がぶれないように、この記事での Workflow Engine は、以下の要件を満たすソフトウェアとします。 Workflow Engine とは、依存関係のある複数の Task を、意図した順番通りに実行するもの この記事では、この要件を満たす Workflow Engine を Ruby でつくる方法について解説します。 依存関係を記述する 依存関係を解決するコードを書く前に、依存関係を記述する方法をまず決めましょう。 依存関係を記述するには、luigi のように Ta

    Workflow Engine をつくろう! Part 1(Task の依存関係の解決) - Qiita
  • Workflow Engine をつくろう! Part 3(Task 間でのデータのやり取り) - Qiita

    今回は Task 間でのデータのやり取りを実装していきます。次回以降が少し長くなるのと、実装が簡単なので、今回は短めにまとめます。 Task 間のデータのやり取りとは? 「Task 間のデータのやり取り」とは何かを、いつもの例で説明します。 「Task 間のデータのやり取り」というのは、正確には「下流(Downstream)Task の output メソッドの戻り値を、上流(Upstream)Task が参照できる」機能のことです。上の図でいうと、TaskB, TaskC は TaskD の、TaskA は TaskB, TaskC の output メソッドの戻り値である Target クラスのインスタンスを参照できます。この機能を持つメソッドを、input メソッドとして追加します。 すると TaskD#output == TaskB#input TaskD#output == Ta

    Workflow Engine をつくろう! Part 3(Task 間でのデータのやり取り) - Qiita
  • Workflow Engine をつくろう! Part 4(Task の並列実行) - Qiita

    これらは排他的ではないので、組み合わせて利用することも可能です。今回は、最もスケールしない1が、最も実装が簡単な Thread を利用して実装していきます。 実装方針 並列実行を処理の概要は以下のようになります。 依存関係に基づいて、Task ソートする。ソートした、配列を Task Queue とする Task 実行用の Worker スレッドを指定した数だけ用意する Worker スレッド内で、Task Queue から Task を取り出す Task が実行可能になるまで待つ Task を実行する Task クラスの変更 並列実装をするにあたって、Task が実行可能状態にあるのかを判定する必要があります。これまでの説明から、「Task が実行可能である」というのは「依存する全ての Task が完了している」ことであり、そして、「Task が完了している」というのは、「出力となる T

    Workflow Engine をつくろう! Part 4(Task の並列実行) - Qiita
  • Workflow Engine をつくろう! Part 2 (Workflow の冪等性) - Qiita

    Part 1 Task の依存関係の解決 Part 2 Workflow の冪等性 Part 3 Task 間でのデータのやり取り Part 4 Task の並列実行 まえおき この連載記事で作成する Workflow Engine は、Luigi の設計思想に大きく影響を受けています。なので、 @k24d さんの Luigi によるワークフロー管理 を先に読んでおくと、理解が深まると思います。 前回は Task の依存関係の解決方法を実装しましたが、Part 2の今回は Workflow の冪等性について実装していきます。 Workflow を実行、制御する上で、冪等性(何回実行しても結果が同じであること)は大変重要です。なぜ重要かというと、現実問題として Workflow は途中で失敗する可能性があり、いかに確実に簡単にリトライ処理ができるかが、業務システムを運用の手間に直結するからで

    Workflow Engine をつくろう! Part 2 (Workflow の冪等性) - Qiita
  • 1