This chapter contains a full example of PyDaF's usage. The example implements a word counter using a Map/Reduce algorithm. However, it harnesses processor specificity to modify the parts generated by the splitting code before they are mapped. Finally, another processor removes all words that are less than 2 characters long from the generated dictionary.
We will use a threaded pipeline as it is more appropriate for Map/Reduce algorithms. The input of a job will be a List data unit with the input-files flag, containing the list of files from which words are to be counted. We will expect a Dict with the result flag to be produced as a result of the pipeline.
The complete source code for this example is available in the PyDaF distribution; it can be found in the examples/specificity/ directory.
The first thing we will need is the Map/Reduce class; this class will implement the actual splitting and word counting of the input. It will retire its input data, as the list of files is no longer required after the files have been read. Parts will be strings containing 100 lines from an input file. In the end, the generated dictionary will have the wordcount flag.
First, we need to define the Map/Reduce implementation's metadata:
class WordCounter( pydaf.MapReduce ): class Meta: name = 'wcount' retire_input = True in_type = S( List , flags = ( 'input-files' , ) , to = 'files' ) part_type = String output_flag = 'wordcount'
This being done, we will define the splitting code. We must be careful not to generate parts that contain no text at all - not that it would be a problem, but it would be useless.
def split( self , input , output ): max_len = 100 for f_name in input.files[ 0 ]: f = open( f_name ) try: f_split = [ ] got_it = False for line in f: got_it = True f_split.append( line ) if len( f_split ) > max_len: txt = String( ''.join( f_split ) ) output.append( txt ) f_split = [] if got_it and len( f_split ): txt = String( ''.join( f_split ) ) output.append( txt ) finally: f.close( )
The resulting processor will generate a set of String units with the split-wcount flag set, as well as the incomplete output dictionary and parts counter.
The mapping code is simple enough:
def map( self , input , output ): for word in input.split(): if word not in output: output[ word ] = 1 else: output[ word ] = output[ word ] + 1
So is the reducing code, which basically adds all counts to the final dictionary.
def reduce( self , input , output ): for word , count in input.items( ): if word in output: count = count + output[ word ] output[ word ] = count
We will now define a processor that will take the parts generated by the splitting code above before they are mapped. This can be done easily by accepting the parts as a read/write input unit, and setting a new flag to make sure the parts are not processed twice.
The processor itself will make sure that all text is lower-case, and that it only includes actual words.
class PartPreprocessor( pydaf.Processor ): class Meta: inout = S( String , flags = ( 'split-wcount' , ) , not_flags = ( 'preprocessed' , ) , to = 'text' ) def process( self ): t = self.inout.text[ 0 ] t.value = re.sub( r'[^a-z]' , ' ' , t.lower() ) t.pydaf.set_flag( 'preprocessed' )
A last processor is required to remove all words shorter than 2 characters from the dictionary.
class ShortWordRemover( pydaf.Processor ): class Meta: inout = S( Dict , flags = ( 'wordcount' , ) , to = 'wc' ) def process( self ): d = self.inout.wc[ 0 ] for word in d.keys( ): if len( word ) < 3: del d[ word ] d.pydaf.replace_flag( 'wordcount' , 'result' )
Now that the processors have been defined, the only thing left to do is to create a pipeline and have it process the jobs. First, we will initialise the pipeline:
pipeline = pydaf.ThreadedPipeline( ) pipeline.add_runner( pydaf.ThreadedRunner , count = 8 ) pipeline.add_processors( WordCounter( ) , PartPreprocessor , ShortWordRemover )
In this example, we use 8 concurrent threads. The pipeline is initialised using the processors generated by the Map/Reduce helper as well as the two manually defined processors.
Once this is done, we will need to generate the input data. We will use the script's arguments as the initial contents of the list.
l = List( sys.argv[ 1: ] ) l.pydaf.set_flag( 'input-files' )
Once this is accomplished, we still need to run the job:
wanted = S( Dict , flags = [ 'result' ] ) try: result_id = pipeline.process( [ l ] , wanted ) pipeline.wait( result_id ) try: result = pipeline.get_result( result_id ) except pydaf.JobFailed , exc: from traceback import print_exception print print "JOB %d FAILED" % result_id print_exception( *( exc.cause ) ) print sys.exit( -1 ) for word , count in result[0].items( ): print "'%s': %d" % ( word , count ) finally: pipeline.stop( )