DataVec
Konduit Serving supports data transformations defined by the DataVec vectorization and ETL library.
from konduit import TransformProcessStep, ServingConfig
from konduit.server import Server
from konduit.client import Client
from konduit.utils import is_port_in_use
from pydatavec import Schema, TransformProcess
from utils import load_java_tp
import numpy as np
import random
import time
import json
import os
DataVec transformations can be defined in Python using the PyDataVec package, which can be installed from PyPi:
pip install pydatavec
Using PyDataVec requires Docker. For Windows 10 Home edition users, note that Docker Toolbox is not supported.
Run the following cell to check that your Docker installation is successful:
!docker run hello-world
Hello from Docker!
This message shows that your installation appears to be working correctly.
To generate this message, Docker took the following steps:
1. The Docker client contacted the Docker daemon.
2. The Docker daemon pulled the "hello-world" image from the Docker Hub.
(amd64)
3. The Docker daemon created a new container from that image which runs the
executable that produces the output you are currently reading.
4. The Docker daemon streamed that output to the Docker client, which sent it
to your terminal.
To try something more ambitious, you can run an Ubuntu container with:
$ docker run -it ubuntu bash
Share images, automate workflows, and more with a free Docker ID:
https://hub.docker.com/
For more examples and ideas, visit:
https://docs.docker.com/get-started/
Data transformations with PyDataVec
Schema (source)
A Schema
specifies the structure of your data. In DataVec, a TransformProcess
requires the Schema
of the data to be specified.
Schema
objects have a number of methods that define different data types for columns: add_string_column()
, add_integer_column()
, add_long_column()
, add_float_column()
, add_double_column()
and add_categorical_column()
.
TransformProcess (source)
TransformProcess
provides a number of methods to manipulate your data. The following methods are available in the Python API:
Reduce the number of rows:
filter()
General data transformations:
replace()
,Type casting:
string_to_time()
,derive_column_from_time()
,categorical_to_integer()
,Combining/reducing the values in each column:
reduce()
String operations:
append_string()
,lower()
,upper()
,concat()
,remove_white_spaces()
,replace_empty_string()
,replace_string()
,map_string()
Column selection/renaming:
remove()
,remove_columns_except()
,rename_column()
One-hot encoding:
one_hot()
In this short example, we append the string two
to the end of values in the string column first
.
schema = Schema()
schema.add_string_column("first")
tp = TransformProcess(schema)
tp.append_string("first", "two")
The TransformProcess
configuration has to be converted into JSON format to be passed to Konduit Serving.
java_tp = tp.to_java()
tp_json = java_tp.toJson()
load_java_tp(tp_json)
as_python_json = json.loads(tp_json)
Configure the step
The TransformProcess
can now be defined in the Konduit Serving configuration with a TransformProcessStep
. Here, we
configure the inputs and outputs: the schema, column names and data types should be defined here.
declare the
TransformProcess
using the.transform_process()
method.
Note that Schema
data types are not defined in the same way as PythonStep
data types. See the source for a complete list of supported Schema data types:
NDArray
String
Boolean
Categorical
Float
Double
Integer
Long
Bytes
You should define the Schema data types in TransformProcessStep()
as strings.
transform_step = (TransformProcessStep()
.set_input(schema=None,
column_names=["first"],
types=["String"])
.set_output(schema=None,
column_names=["first"],
types=["String"])
.transform_process(as_python_json))
Configure the server
Configure the Server using ServingConfig
to define the port using the http_port
argument and data formats using the input_data_type
and output_data_type
arguments.
port = np.random.randint(1000, 65535)
serving_config = ServingConfig(
http_port=port,
input_data_format='JSON',
output_data_format='JSON',
)
server = Server(
serving_config=serving_config,
steps=[transform_step]
)
The complete configuration is as follows:
server.config.as_dict()
{'@type': 'InferenceConfiguration',
'steps': [{'@type': 'TransformProcessStep',
'inputSchemas': {'default': ['String']},
'outputSchemas': {'default': ['String']},
'inputNames': ['default'],
'outputNames': ['default'],
'inputColumnNames': {'default': ['first']},
'outputColumnNames': {'default': ['first']},
'transformProcesses': {'default': {'actionList': [{'transform': {'@class': 'org.datavec.api.transform.transform.string.AppendStringColumnTransform',
'columnName': 'first',
'toAppend': 'two'}}],
'initialSchema': {'@class': 'org.datavec.api.transform.schema.Schema',
'columns': [{'@class': 'org.datavec.api.transform.metadata.StringMetaData',
'name': 'first'}]}}}}],
'servingConfig': {'@type': 'ServingConfig',
'httpPort': 47964,
'inputDataFormat': 'JSON',
'outputDataFormat': 'JSON',
'logTimings': True}}
Start the server
server.start()
Configure the client
Create a Client
object and specify the port number as an argument:
client = Client(port=port)
Starting server..
Server has started successfully.
Inference
Finally, we run the Konduit Serving instance. Recall that the TransformProcessStep()
appends a string two
to strings in the column first
:
data_input = {'first': 'value'}
predicted = client.predict(data_input)
print(predicted)
server.stop()
{'first': 'valuetwo'}
Last updated
Was this helpful?