Understanding Tensor Flow Input Pipelines Part 1

Hello World!

Alright; so this whole input pipeline thing in pretty much every framework is the most undocumented thing in the universe.  So this article is about demystifying it.  We can break down the process into a few key steps:

  1. Acquire & Label Data
  2. Process Label Files for Record Conversions
  3. Process Label Files for Training a Specific Network Interface
  4. Train the Specific Network Interface

This is part 1.  We will focus on the 3rd item in this list; processing the files into TF Records.  Note you can find more associated code in the TensorFlow section of this git repository: https://github.com/drcrook1/CIFAR10

Preface – What is a Network Interface?

So this is a notion that I have kinda just made up; but it makes sense to treat networks this way.  A network interface is a network that accepts a particular shape of input and produces a particular shape of output.  All the stuff that happens in the middle doesn’t matter.  For example in CIFAR10; we have 32 x 32 x 3 tensors (RGB images) which produces probabilities in 10 classes; or a size 10 vector.  The network interface is 32 x 32 x 3 in and 10 out.  You can have a VGG9 VGG12, a series of MLPs for an FCN type network or whatever; who cares; its 32 x 32 x 3 in and 10 out.  The TFRecords produced will work for that network.

The only other caveat to this is that 10 out has to be of the particular type of thing you are wanting out; in the CIFAR 10 instance it is probabilities for each class; not regressors of say 10 different things.  Even though it “fits” the correct shapes; it does not have the same soul of what the network is doing and therefor different records would be required.

Ok, that aside; we can move on 😀

Data Acquisition and Labeling

Acquisition of just ‘data’ isn’t so bad; but that high quality; high value labeled stuff is the magic sauce; that is what you need.  In this case; we grab the data from Kaggle.  I like this variation of the CIFAR 10 data set because it is way more realistic than all the other samples out there which do all of these steps already.  Please start with this version of the data set; otherwise you will not actually learn how the input pipelines work.

https://www.kaggle.com/c/cifar-10/data

Process Labels for Record Conversion

You can read all about using the CIFAR 10 data set to get into a format for the machine to do the actual work here: http://dacrook.com/prepping-label-files-for-ml-training-on-specific-machine/ I’ll need to do a follow up on this using a Spark cluster on HDInsight; because that is way more realistic.

Process Labels for Network Interface

Alright; here we go: some new stuff.  Remember; the interface we are targeting takes a 32 x 32 x 3 input where the inputs are float32 and it produces a 10 vector of probabilities in float32.  Once we have records like this; we can iterate on various models and parameters etc etc.

KEY NOTE: Ensure that you zero index your labels.  So if you have 10 classes; your labels will be 0-9 and not 1-10.  If you use 1-10 you will get nan gradients and debug forever until you figure out finally that your labels are incorrect.

Core Function Definitions

So I like to break my code into a few parts.  First we need to create the underlying function definitions that do the real work.  The functions perform the following tasks

  1. Create a random byte list entry into the record
  2. Create a random integer list entry into the record
  3. actually write a record to disk
  4. convert an image to a byte string
  5. control the flow of reading from the label file and writing to disk

Import Statements

import math
import os
import tensorflow as tf
import pandas as pd
import numpy as np
from scipy import misc
import CONSTANTS

CONSTANTS is the only one defined elsewhere; I like to keep shared constants across files in a single file so I can make changes in a single place.  Think of it like a configuration file.

#!/usr/bin/env python3
"""
Author: David Crook
Copyright Microsoft Corporation 2017
"""

DEBUG = True

IMAGE_SHAPE = (32, 32, 3)
NUM_CLASSES = 10

INPUT_PIPELINE_THREADS = 8
#batch size * minibatches = # samples in data set or greater.
BATCH_SIZE = 1000
MINI_BATCHES = 50
EPOCHS = 200
LEARNING_RATE = 1e-3
N_CLASSES = 10

r_bdir = 'C:/data/cifar_10/tfrecords/'
RecordPaths = [
    r_bdir + '1.tfrecords',
    r_bdir + '2.tfrecords',
    r_bdir + '3.tfrecords',
    r_bdir + '4.tfrecords'
]

Create Entries for Records

def bytes_feature(value):
    '''
    Creates a TensorFlow Record Feature with value as a byte array
    '''
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def int64_feature(value):
    '''
    Creates a TensorFlow Record Feature with value as a 64 bit integer.
    '''
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

These are pretty straight forward; they simply wrap the tensorflow commands.  The only thing to really note is everything is lists.  So we will write the raw image with the bytes_feature and we will write the single label value (as a list) with the int64 feature.

WHY NOT ONE HOT ENCODE? -> Tensorflow has a sparse cross entropy loss function we will use that works pretty well.  You can one hot encode if you like but it is not necessary.

Write Record to Disk

Alright; what are we doing here?  Well; we pass a path; this is the actual full path that will be written, such as C:/records/recordA.tfrecord.  We also take in a dataframe which has an image column and a label column.  the image column is a properly formatted string and the label is an int64.

def write_record(dest_path, df):
    '''
    Writes an actual TF record from a data frame
    '''
    writer = tf.python_io.TFRecordWriter(dest_path)
    for i in range(len(df)):
        example = tf.train.Example(features=tf.train.Features(feature={
            'example': bytes_feature(df['image'][i]),
            'label': int64_feature(df['label'][i])
        }))
        writer.write(example.SerializeToString())
    writer.close()

Read Image from Path to Byte String

This is where much of the magic happens.  Also notice that we do a little bit of pre-processing here.  I know the min value for a pixel is 0 and max is 255; so a quick trick is just divide the whole tensor by 255 to normalize the image.  Do this to all images.  You can verify it renders the same using matplotlib if you like.

def read_image_to_bytestring(path):
    '''
    Reads an image from a path and converts it
    to a flattened byte string
    '''
    img = misc.imread(path).astype(np.float32) / 255.0
    return img.reshape(CONSTANTS.IMAGE_SHAPE).flatten().tostring()

So we use scipy’s misc (make sure you install PIL if you didn’t install the anaconda stack) to read the path as a numpy array and we force it to a float32 (makes reading into the network work properly later since our kernel variables are all of type float 32.

Divide by 255 for some quick normalization; we then pull in the IMAGE_SHAPE from our constants (32, 32, 3) and then flatten it and push to a string.  Technically you may not need to flatten; but I just don’t trust it, I want it flattened so I can reshape on the other side.

Control Flow and Write TF Records from File Path

So this code takes in the file we build from the previous post, a folder we want to drop the records into and the number of records we want.  The reason we take num_records as a parameter is so that we can distribute images across multiple records in the scenario we want to distribute the training across multiple machines and have the weights averaged on a central node.  You may or may not need to do this; but it doesn’t do any harm to feed a single training node multiple records (just ensure you use extra input threads to bump your performance if you do this).

def write_records_from_file(labels_file, dest_folder, num_records):
    '''
    Takes a label file as a path and converts entries into a tf record
    for image classification.
    '''
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)
    labels = pd.read_csv(labels_file)
    #read image, flatten and then convert to a string
    img_arrs = [read_image_to_bytestring(path) for path in labels['examplepath']]
    labels['image'] = pd.Series(img_arrs)
    start_idx = 0
    ex_per_rec = math.ceil(len(labels) / num_records)
    for i in range(1, num_records):
        rec_path = dest_folder + str(i) + '.tfrecords'
        write_record(rec_path, labels.loc[start_idx:(ex_per_rec * i) - 1].reset_index())
        start_idx += ex_per_rec
        print('wrote record: ', i)
    final_rec_path = dest_folder + str(num_records) + '.tfrecords'
    write_record(final_rec_path, labels.loc[ex_per_rec * (num_records - 1):].reset_index())
    print('wrote record: ', num_records)
    print('finished writing records...')

First thing we do is double up to make sure the destination folder exists and if not create it (python kinda crashes if you don’t do this and the folder doesn’t exist).  Next up read in the csv file.

We then use list comprehension and the read image to byte string function to process each path to generate the image arrays as byte strings which are properly formatted and normalized for writing to tf records.  We have to push to a series and then into a new column ‘image’.  Reason we push to a series is simply to keep pandas happy.

Use math.ceil (not math.floor) because your final record can be n larger depending on the fraction of left over for each record you write; which can be bigger than you intended; so using math.ceil decreases the number of records to a min of 1 for the final record (i think?).

We then slice the dataframe up and reset the index (if you don’t reset the index you get key hash errors). and push into the write_record function.

The App Entry Point

So to enter the application to actually write this stuff; it is defined here:

#!/usr/bin/env python3
"""
Author: David Crook
Copyright Microsoft Corporation 2017
"""
import PreProcess

LABELS_FILE = 'C:/data/cifar_10/proc_train_labels.csv'
TF_REC_DEST = 'C:/data/cifar_10/tfrecords/'

def main():
    '''
    Main function which converts a label file into tf records
    '''
    PreProcess.write_records_from_file(LABELS_FILE, TF_REC_DEST, 4)

if __name__ == "__main__":
    main()

So here we are going to create 4 tf records; and we are reading the csv from that location and writing the records to that particular directory.  We will end up with 1.tfrecord, 2.tfrecord, 3.tfrecord and 4.tfrecord

Summary

OK, so that was a ton of ground to cover.  We focused just on writing TF Records. Please ask questions in the comments section, any thing you may need assistance in understanding etc etc.

ALL OF THE CODE FILES

CONSTANTS.py

#!/usr/bin/env python3
"""
Author: David Crook
Copyright Microsoft Corporation 2017
"""

DEBUG = True

IMAGE_SHAPE = (32, 32, 3)
NUM_CLASSES = 10

INPUT_PIPELINE_THREADS = 8
#batch size * minibatches = # samples in data set or greater.
BATCH_SIZE = 1000
MINI_BATCHES = 50
EPOCHS = 200
LEARNING_RATE = 1e-3
N_CLASSES = 10

r_bdir = 'C:/data/cifar_10/tfrecords/'
RecordPaths = [
    r_bdir + '1.tfrecords',
    r_bdir + '2.tfrecords',
    r_bdir + '3.tfrecords',
    r_bdir + '4.tfrecords'
]

PreProcess.py

'''
Author: David Crook
Module which contains functions for pre-processing image data
'''
import math
import os
import tensorflow as tf
import pandas as pd
import numpy as np
from scipy import misc
import CONSTANTS

def bytes_feature(value):
    '''
    Creates a TensorFlow Record Feature with value as a byte array
    '''
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def int64_feature(value):
    '''
    Creates a TensorFlow Record Feature with value as a 64 bit integer.
    '''
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def write_record(dest_path, df):
    '''
    Writes an actual TF record from a data frame
    '''
    writer = tf.python_io.TFRecordWriter(dest_path)
    for i in range(len(df)):
        example = tf.train.Example(features=tf.train.Features(feature={
            'example': bytes_feature(df['image'][i]),
            'label': int64_feature(df['label'][i])
        }))
        writer.write(example.SerializeToString())
    writer.close()

def read_image_to_bytestring(path):
    '''
    Reads an image from a path and converts it
    to a flattened byte string
    '''
    img = misc.imread(path).astype(np.float32) / 255.0
    return img.reshape(CONSTANTS.IMAGE_SHAPE).flatten().tostring()

def write_records_from_file(labels_file, dest_folder, num_records):
    '''
    Takes a label file as a path and converts entries into a tf record
    for image classification.
    '''
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)
    labels = pd.read_csv(labels_file)
    #read image, flatten and then convert to a string
    img_arrs = [read_image_to_bytestring(path) for path in labels['examplepath']]
    labels['image'] = pd.Series(img_arrs)
    start_idx = 0
    ex_per_rec = math.ceil(len(labels) / num_records)
    for i in range(1, num_records):
        rec_path = dest_folder + str(i) + '.tfrecords'
        write_record(rec_path, labels.loc[start_idx:(ex_per_rec * i) - 1].reset_index())
        start_idx += ex_per_rec
        print('wrote record: ', i)
    final_rec_path = dest_folder + str(num_records) + '.tfrecords'
    write_record(final_rec_path, labels.loc[ex_per_rec * (num_records - 1):].reset_index())
    print('wrote record: ', num_records)
    print('finished writing records...')    

WriteTFRecords.py

#!/usr/bin/env python3
"""
Author: David Crook
Copyright Microsoft Corporation 2017
"""
import PreProcess

LABELS_FILE = 'C:/data/cifar_10/proc_train_labels.csv'
TF_REC_DEST = 'C:/data/cifar_10/tfrecords/'

def main():
    '''
    Main function which converts a label file into tf records
    '''
    PreProcess.write_records_from_file(LABELS_FILE, TF_REC_DEST, 4)

if __name__ == "__main__":
    main()

Leave a Reply

Your email address will not be published. Required fields are marked *