Edit page in Livemark
(2022-09-19 18:33)

Pipeline Class

Pipeline is a object containg a list of transformation steps.

Creating Pipeline

Let's create a pipeline using Python interface:

from frictionless import Pipeline, transform, steps

pipeline = Pipeline(steps=[steps.table_normalize(), steps.table_melt(field_name='name')])
print(pipeline)
{'steps': [{'type': 'table-normalize'},
           {'type': 'table-melt', 'fieldName': 'name'}]}

Running Pipeline

To run a pipeline you need to use a transform function or method:

from frictionless import Pipeline, transform, steps

pipeline = Pipeline(steps=[steps.table_normalize(), steps.table_melt(field_name='name')])
resource = transform('table.csv', pipeline=pipeline)
print(resource.schema)
print(resource.read_rows())
{'fields': [{'name': 'name', 'type': 'string'},
            {'name': 'variable', 'type': 'string'},
            {'name': 'value', 'type': 'any'}]}
[{'name': 'english', 'variable': 'id', 'value': 1}, {'name': '中国人', 'variable': 'id', 'value': 2}]

Transform Steps

The Step concept is a part of the Transform API. You can create a custom Step to be used as part of resource or package transformation.

This step uses PETL under the hood.

from frictionless import Step

class cell_set(Step):
    code = "cell-set"

    def __init__(self, descriptor=None, *, value=None, field_name=None):
        self.setinitial("value", value)
        self.setinitial("fieldName", field_name)
        super().__init__(descriptor)

    def transform_resource(self, resource):
        value = self.get("value")
        field_name = self.get("fieldName")
        yield from resource.to_petl().update(field_name, value)

Reference

Pipeline (class)

Step (class)

Pipeline (class)

Pipeline representation

Signature

(*, name: Optional[str] = None, title: Optional[str] = None, description: Optional[str] = None, steps: List[Step] = NOTHING) -> None

Parameters

  • name (Optional[str])
  • title (Optional[str])
  • description (Optional[str])
  • steps (List[Step])

pipeline.name (property)

NOTE: add docs

Signature

Optional[str]

pipeline.title (property)

NOTE: add docs

Signature

Optional[str]

pipeline.description (property)

NOTE: add docs

Signature

Optional[str]

pipeline.steps (property)

List of transform steps

Signature

List[Step]

pipeline.step_types (property)

Return type list of the steps

Signature

List[str]

pipeline.add_step (method)

Add new step to the schema

Signature

(step: Step) -> None

Parameters

  • step (Step)

pipeline.clear_steps (method)

Remove all the steps

Signature

() -> None

pipeline.get_step (method)

Get step by type

Signature

(type: str) -> Step

Parameters

  • type (str)

pipeline.has_step (method)

Check if a step is present

Signature

(type: str) -> bool

Parameters

  • type (str)

pipeline.remove_step (method)

Remove step by type

Signature

(type: str) -> Step

Parameters

  • type (str)

pipeline.set_step (method)

Set step by type

Signature

(step: Step) -> Optional[Step]

Parameters

  • step (Step)

Step (class)

Step representation

Signature

(*, title: Optional[str] = None, description: Optional[str] = None) -> None

Parameters

  • title (Optional[str])
  • description (Optional[str])

step.type (property)

NOTE: add docs

Signature

ClassVar[str]

step.title (property)

NOTE: add docs

Signature

Optional[str]

step.description (property)

NOTE: add docs

Signature

Optional[str]

step.transform_package (method)

Transform package

Signature

(package: Package)

Parameters

  • package (Package): package

step.transform_resource (method)

Transform resource

Signature

(resource: Resource)

Parameters

  • resource (Resource): resource
It's a beta version of Frictionless Framework (v5). Read Frictionless Framework (v4) docs for a version that is currently installed by default by pip.