Importing Data with Flume

Importing Data with Flume

flume-logoOne of the first things to decide on when processing data with Hadoop is how to get your data into HDFS. One of the many solutions to this is Apache Flume. We are using CDH (Cloudera’s Distribution of Hadoop) and Cloudera Manager, so setting up Flume was as simple as adding a new flume service from the main page. The tricky part is writing the configuration.

For my specific project, I have apache access logs on a mounted file system. The files are rotated into this directory by another mechanism so I don’t need to worry about trying to import a file that is currently being written to. It is possible to set up flume to “tail” the current log file, but I wanted to work with files that have already been rotated out. To address this use case, flume provides the Spooling Directory Source as a built in feature.

This source will periodically check a single directory you specify for files that need to be transferred. Version 1.4.0 does not support recursive directory checks, so, if you store your logs in a directory with a timestamp (i.e. /logs/project_name/2013/12/26), you will have to move all files to a single directory (i.e. /logs/project_name). As a result of this, you should include the timestamp and the hostname in the name of each file as they get rotated to avoid collisions.

When flume has successfully transferred the files to HDFS, it will RENAME them with a suffix that you can specify in the configuration. If you don’t specify in the configuration to delete the files after being processed (deletePolicy), you may need to make sure you have a cron job to periodically archive them. In my opinion, the latter is a better way to go since the only options for deletePolicy are “immediate” or “never”. If there was a “keep files for xx days before deleting them” option, that would be better in case you need to do an emergency reload or troubleshooting.

Let’s take a look at my configuration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1
 
tier1.sources.source1.type     = spooldir
tier1.sources.source1.spoolDir = /mnt/hadoop/access-logs
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
 
tier1.sinks.sink1.type         = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://nameservice1//incoming/%Y/%m/%d/%H
tier1.sinks.sink1.hdfs.fileType = SequenceFile
tier1.sinks.sink1.hdfs.filePrefix = data
tier1.sinks.sink1.hdfs.fileSuffix = .seq
 
# Roll based on the block size only
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollSize = 120000000
# seconds to wait before closing the file.
tier1.sinks.sink1.hdfs.idleTimeout = 60
tier1.sinks.sink1.channel      = channel1
 
tier1.channels.channel1.capacity = 100000
tier1.sources.source1.deserializer.maxLineLength = 32768

Luckily, even though the spooling directory source doesn’t support grabbing from timestamped directories, the hdfs sink can write to them as shown in line 11 of the configuration above. This source also supports writing various file formats. I’ve chosen to write the data as sequence files. More details about getting time based escape sequences working are at the bottom.

Lines 17-19 are configuring the hdfs sink to roll the file based on size. I’ve chosen a size that is close to my configured hdfs block size. Your mileage may vary based on the size of your data. If you have a large volume of data you may want to write the files to be multiples of your configured block size for Map/Reduce job performance. Notice that I set the rollCount and rollInterval options to 0 to disable them. This is to make sure the file is rolled only on size and not by the number of events or a number of seconds. I did set an idle time just to make sure that flume will close the file it is reading if it doesn’t see anything new for 60 seconds.

Another important thing to point out is on line 25 where I am adjusting the max line length. By default, the spooling directory source has a maximum line length of 2048. Anything longer than that will be counted AS THE NEXT LINE. Unless you are expecting this, your data will get corrupted during processing. My solution was to just use a really long line length. It doesn’t seem to impact the data import process at all and all data associated with one event stays together.

In order for the time related escape sequences to work, there needs to be a key named “timestamp” in the header of the event. To get this to work, you can either:

  1. Set hdfs.useLocalTimeStamp to true
  2. Use the built in TimestampInterceptor
  3. Write a custom interceptor

I chose to write my own custom interceptor because I didn’t want the files put in HDFS by the time in which they were imported, but by the time of the actual event.
Here is a snippet of the code to show how that works:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ApacheLogTimestampInterceptor implements Interceptor {
    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
 
        long ts = extractTimestamp(event);
        headers.put(TIMESTAMP, ts + "");
 
        return event;
    }
    protected long extractTimestamp(Event event) {
        byte[] body = event.getBody();
        if (body != null) {
            String bodyString = new String(body);
            int index = bodyString.indexOf("\t");
            if (index > 0) {
                String timeToken = bodyString.substring(0, index);
                try {
                    long ts = Long.parseLong(timeToken);
                    // Check if we need to add milliseconds
                    if (ts < 1300000000000l) {
                        ts *= 1000;
                    }
                    return ts;
                }
                catch (NumberFormatException e) {
                    log.warn(e.getMessage(), e);
                }
            }
        }
        // If we cannot determine the timestamp from the body, return current time
        return System.currentTimeMillis();
    }
    @Override
    public List intercept(List events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new ApacheLogTimestampInterceptor();
        }
 
        @Override
        public void configure(Context context) {
        }
    }
}

Since we know the file is an Apache log, we can cheat and just grab the first token from the tab delimited file as the timestamp. I added a check in case the timestamp didn’t have millisecond precision and then inserted it as a header into the event.
Of course, you will need to specify this interceptor in the configuration:

1
2
tier1.sources.source1.interceptors=tsi
tier1.sources.source1.interceptors.tsi.type=my.company.hadoop.flume.interceptors.ApacheLogTimestampInterceptor$Builder

Once all this is configured, you can start your flume service in Cloudera Manager and the import will begin.

Now you have your data in HDFS, in files that should be close to the block size and you have them organized by timestamp of when the event happened to the nearest hour. This will help greatly when automating Map/Reduce jobs to process your data. Happy processing!

Using Hadoop to Populate Elastic Search

Using Hadoop to Populate Elastic Search

hadoop-logo

Let’s say you want to create your own comparison shopping site. You go out and get product dumps from Amazon, Shopping.com, Shopzilla, etc. and now you want to be able to run searches on them. All these files are multiple gigabytes each, so how do you get them indexed and searchable in a reasonable amount of time? One solution to this problem is to use Hadoop. Wat? Feed files are one line per product (assuming you don’t choose the xml feed option) and each line is independent of another, and that makes it perfect for the glorified divide and conquer algorithm that is Hadoop.

Let’s assume we have loaded the files into HDFS already (that can be a blog post on it’s own) and that the files are uncompressed. Most providers will give you gzipped files, but gzip is not splittable in Hadoop so if you want to take full advantage of the parallelism, you will need to uncompress the files.

First, let’s look at the set up for the map job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    String entryClass = context.getConfiguration().get("ftpfeed.entry.class.name");
    if (entryClass != null) {
        try {
            entry = Class.forName(entryClass).asSubclass(FeedEntry.class).newInstance();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
 
    sourceName = context.getConfiguration().get("ftpfeed.source.name");
    header = context.getConfiguration().get("ftpfeed.header");
    String delim = context.getConfiguration().get("ftpfeed.delimiter");
    if (delim != null && !"".equals(delim)) {
        delimiter = delim;
    }
}

The setup will create an instance of the class that is passed in as a subclass of FeedEntry so the rest of the code doesn’t need to know the details of that specific implementation. The setup also grabs the source name (i.e. amazon), what the header line looks like and what the field delimiter is.

If we take a look at the mapper:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // protect against blank or invalid lines
    if ("".equals(value.toString().trim())) {
        log.debug("blank line, skipping.");
        return;
    }
 
    String[] fields = value.toString().split(delimiter, -1);
 
    // Skip the header line
    if (header != null && header.equals(fields[0])) {
        return;
    }
 
    entry.setFeedSource(sourceName);
    entry.set(fields);
 
    String json = mapper.writeValueAsString(entry);
 
    outKey.set(rand.nextDouble() + "");
    outValue.set(json);
 
    context.write(outKey, outValue);
}

We can see this mapper is generic enough to support most feed files. The set method to the FeedEntry implementation that we are using will know how to pull information out of the array of tokens. We are pre-creating the instance of FeedEntry and then just calling the set method rather than instantiating a new object because the object churn and garbage collection you run into when doing it 298347298374923 times hurts performance.

Some feeds include category data, some don’t. If they do, one will certainly be different than any other so if you want to normalize the category hierarchy (also could be a separate blog post) this would be the place to do it. Once the entry has been set up, it is written to the context as a JSON object using the jackson library. Instead of creating my own partitioner, I chose to just write a random number as the key. This is done to ensure that the reducers are handling as equal amount of the workload as possible. Yes, there are other ways to accomplish that, I chose to use random numbers. The trick then is to figure out how many reducers to use in order to get through the data fast without overloading Elastic Search.

Now let’s look at the reducer set up:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
protected void setup(Context context) throws IOException, InterruptedException {
 
    super.setup(context);
 
    String bulkCapStr = context.getConfiguration().get("search.bulk.cap", "" + BULK_CAP);
    bulkCap = Integer.parseInt(bulkCapStr);
 
    String host = context.getConfiguration().get("search.host");
    String port = context.getConfiguration().get("search.port");
    String cluster = context.getConfiguration().get("search.cluster.name");
    indexName = context.getConfiguration().get("search.index.name");
 
    Settings settings = ImmutableSettings.settingsBuilder()
                                             .put("cluster.name", cluster).build();
 
    client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host,
                                                                    Integer.parseInt(port)));
}

After retrieving a bunch of configuration parameters, the set up method creates the client for ElasticSearch.
The reduce method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
    protected void reduce(Text key,
                          Iterable<Text> values,
                          Context context)
            throws IOException, InterruptedException {
 
        bulkRequest = client.prepareBulk();        
 
        for (Text v : values) {
            if (bulkRequest.numberOfActions() >= bulkCap) {
                BulkResponse bulkResponse = bulkRequest.execute().actionGet();                
                if (bulkResponse.hasFailures()) {
                    log.warn(bulkResponse.getItems());
                }
                bulkRequest = client.prepareBulk();
                context.progress();
            }
            // Attempt to dedupe listings by md5 hashing the title
            String md5id = getHash(v.toString());
            if (md5id != null) {
                bulkRequest.add(client.prepareIndex(indexName, "offers", md5id)
                                      .setSource(v.toString()));
            }
            else {
                bulkRequest.add(client.prepareIndex(indexName, "offers")
                                      .setSource(v.toString()));
            }            
        }
 
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            log.error(bulkResponse.getItems());
        }
    }

I chose to use the Java API for Elastic Search instead of posting to the REST URL directly. Each value is added to a BulkRequest object and once a certain threshold is met (configurable) it flushes the inserts to Elastic Search. I do have some business logic in there that creates an MD5 hash from the title minus clothing sizes in an effort to dedupe listings.

This reducer takes in as one of the parameters, the index to write to. Since you can specify multiple indexes when conducting your search, you may want to index by feed and language (amazon_en_us, shopzilla_en_us, etc.) or maybe you want to just shove all offers into one index.

Now your product catalogs are loaded into Elastic Search, you can point your applications to it and search away!

DNS Attacks are everywhere…

It was the best of DNS times, it was the worst of DNS times…  ( May 2013 )

In the beginning, Al Gore and a team of computer pioneers decided to create the internet and on the 4th addressing try, the current method of IP was born. It quickly became obvious that telling your friends to visit 123.101.23.9 would keep the internet club esoteric to just us nerds, and from those needs DNS emerged. DNS is a simple concept, roughly you ask the root name servers who is the authority for a domain, then you ask that authority what does that domain translate to. Since most requests ( at the time ) would ask the root name servers bombarding it with packets, decisions had to be made so that users would get a timely response. The solution? Building DNS on the UDP protocol. While the choice at the time seemed obvious ( low latency, connectionless, smaller packet headers, no connection state bottlenecks, low CPU consumption ), it is now the reason why I sit here writing an article after a week of sleepless nights. Before you make any rash presumptions, the problem was not your typical amplification attack, simply a HUGE increase in DNS questions. At worst we were getting about 5MBps of traffic in and out with responses being sent to no consistent range of destinations.

To convey our problem I will need to convey the importance of DNS to our business model. We perform authoritative based answering for any questions sent to us, but do not worry, we are not malicious! This particular use case is for an advertising network of domain parking. SLD’s ( second level domains ) owners point millions of domains to our DNS servers which in return send responses back with varying IP addresses. With such a wide variety of traffic, the DNS back-end must be able to cope with huge amounts of requests of all types regardless of potential maliciousness from the source. Simply put, we can not pick and choose with whom we respond to as all traffic is vital. Our normal qps ( questions per second ) before the jump was around 500 with peaks of about 1,000. Once we knew it was not an amplification problem we were able to start thinking about how to increase capacity.

At the time we were sitting on top of a strong authoritative infrastructure that had chugged along well overpowered for years. The setup was a Netscaler load balancer and behind it, 4 physical 8 core DNS servers. At the 7th layer was PowerDNS utilizing a MySQL backend.

The first step in developing a solution was to have a solid baseline of the problem with an emphasis placed on response times. Anything over a 500ms response time was considered a failure for baseline, and no response at all was considered disastrous. Failure before the attack typically occurred around 1.5k qps ( questions per second ) with disastrous results at around 2k qps ( per server ). With 4 physical servers in rotation at the time, we were capable of handling about 6k qps while being crippled around 8k qps. To determine these results, we used an application called ‘queryperf’ which is apart of the contrib section of BIND and created 250k random length domains using available TLD’s (http://data.iana.org/TLD/tlds-alpha-by-domain.txt). The python program ‘gen-data-queryperf.py’ which is also apart of queryperf created the domains for us.

The day of the questions increase, we spiked north of 50k qps and as noted above we were only capable of responding to 6k qps before our 500ms mean average and 8k before we encountered a failure rate of not responding at all ( servfail ).

To quickly mitigate the problem, we decided to use the Netscalers query cache and placed PowerDNS query caching servers in front of the PowerDNS Authoritative servers to help respond to requests. While this bought us another 10k qps in capacity, the solution was clumsy and not ideal and we were still encountering some packet loss coupled with sporadic load times. Searching the web it was quickly noticed that we were not alone and other large DNS providers were experiencing a similar traffic increase.

With a stopgap in place we began working on a way to handle the increased workload and also be able to scale quickly incase of further spikes. Luckily, we had already built a new environment as we were wanting to upgrade and replace the physical servers with virtual ones and move to the most recent CentOS and PowerDNS products. When we tested the new setup, we quickly and somewhat nervously watched the same results of 1.5k qps before failure coming through queryperf’s emotionless stdout. As this product was inherited, I was not familiar with the other backend options available to PowerDNS and upon reading their documentation I saw that it supported BIND style files. Being very well versed with MySQL, I knew that handling data reporting was something MySQL was well suited for, but having SQL query overhead was probably costing us significant cpu cycles as sar was showing high amounts of context switches. Immediately I began converting our DNS records into flat text files.

A few hundred regular expressions and an hour of QA later resulted in 28MB’s worth of zone files ready to be questioned. With queryperf in hand, we eagerly ran our first tests.

--------------------------------------------------------------------
sh-4.1# ./queryperf -d 250k_domains.txt -s 172.21.9.97 -T 50000

DNS Query Performance Testing Tool
Version: $Id: queryperf.c,v 1.12 2007/09/05 07:36:04 marka Exp $

[Status] Processing input data
[Status] Sending queries (beginning with 172.21.9.97)
[Status] Testing complete

Statistics:

Parse input file: once
Ended due to: reaching end of file

Queries sent: 250000 queries
Queries completed: 249982 queries
Queries lost: 18 queries
Queries delayed(?): 0 queries

RTT max: 0.265384 sec
RTT min: 0.000083 sec
RTT average: 0.000525 sec
RTT std deviation: 0.001293 sec
RTT out of range: 0 queries

Percentage completed: 99.99%
Percentage lost: 0.01%

Started at: Sun Jun 16 12:22:34 2013
Finished at: Sun Jun 16 12:22:40 2013
Ran for: 5.072967 seconds

Queries per second: 49277.276986 qps
Total QPS/target: 49285.430261/50000 qps
--------------------------------------------------------------------

After tuning PowerDNS and the virtual container, I was astonished to see the qps capacity increase per server from 1.5k to over 49k before disastrous failure. Even more remarkable was the mean for performance was MUCH more consistent with response times averaging in the sub 1ms ranges ( internally of course ). After continued testing we felt confident in deployment of our solution by adding 3 more of the same virtual machine to replace the old setup. The new structure was capable of handling well over 120k random questions per second ( not including netscaler help ) and if we required scaling out, we simply needed to clone more virtual machines.

In operations, learning how to put out fires is a constant struggle and your downtime mainly consists of dreaming up even worse fires to try and prevent them. Deadlines have a great way of focusing the mind, and thanks to an outstanding group of peers, we were quickly were able to Macgyver together a CO2 scrubber when the time came and bring our DNS queries home.

Get to know your local Meta Object Protocol

Get to know your local Meta Object Protocol

What’s the big deal with Meta-Object Protocols?

As we begin to introduce Perl’s modern object system, Moose, into parts of the production codebase here at Oversee there’s been a lot of curiosity about what is meant by the term Meta-Object Protocol (MOP) in describing Moose. On the one hand Moose is inarguably the right way to do modern large scale maintainable Object Oriented programming in Perl; but Moose is not the only modern object system for Perl.

That is, Moose implies more than just the automated accessor generation and enhanced syntax for defining classes and attributes of its many predecessors. Moose is an object system built on top of a Meta-Object system (Class::MOP). That’s a fancy of way of saying “You know how you have classes and objects? And you can manipulate objects by creating them, giving their attributes values, and calling methods on them? Yeah, you can do the same thing with classes.” Think about the profound implications of that statement for a moment… it’s the idea that the rest of this post will devote itself to.

So, what IS a Meta-Object Protocol?

Well, in the current context ‘meta’ information is information about something else seen from a level beyond: ‘an X about X’, so to speak. This self-referential notion of the word ‘meta’ was first introduced by Willard Van Orman Quine in his 1937 coinage of the term “metatheorem” where ‘meta’ clearly has the modern meaning of “an X about X”. Earlier uses of the prefix ‘meta’, such as “metaphysics”, do not have this doubled conceptual structure… they are about or beyond X but they are not themselves an X.

Douglas Hofstadter’s 1979 book “Gödel, Escher, Bach” popularized this meaning of the term ‘meta’. Citing Quine, Hofstadter explores recursion, self-reference, and strange-loops and pioneers the use of ‘meta’ also as a standalone word… often as a directional preposition, as in “going meta” to indicate a change in the current level of abstraction.

So, to apply this notion back to programming, information about the attribute values of an object instance is information on the object level; information about the properties of the object itself, about its attributes, its structure, etc. is meta information. In most languages this information is captured in the class definition for the object, so the class definition is a meta-object instance. You have all the functionality of what we are calling a MOP at the class definition level… but typically only at development time.

This typical class-based meta-level is not available at runtime: you cannot manipulate classes at runtime; you cannot add new classes at runtime. So then, the idea of a true MOP is to collapse the meta-level (classes) and the object level (objects) via an explicit API… a “protocol”. A MOP seeks to make the class definitions normal objects and the object properties normal attribute values of the class definitions so that classes themselves can be manipulated at runtime: a self referential system where classes define objects which define classes which define objects, potentially ad infinitum

One of the best-known runtime MOPs is the one described in the book “The Art of the Metaobject Protocol” (AMOP); it applies to the Common Lisp Object System (CLOS) and allows the mechanisms of inheritance, method dispatching, class instantiation and so on to be manipulated. Perl’s Class::MOP, the underlying MOP used by the Moose object system, is based on AMOP and CLOS. In that book the author, Gregor Kiczales, explains the idea of the MOP this way:

In a language based upon [our] metaobject protocols, the language implementation itself is structured as an object-oriented program. This allows us to exploit the power of object-oriented programming techniques to make the language implementation adjustable and flexible. In effect, the resulting implementation does not represent a single point in the overall space of language designs, but an entire region within that space.

So, a MOP is an interpreter of the semantics of a program that is open and extensible. That is, a MOP determines what a program means and what its behavior is. It is extensible in that a programmer (or “metaprogrammer“) can alter program behavior by extending parts of the MOP.

In this way, Perl’s Class::MOP manifests as a set of classes and methods that allow a program to inspect the state of the supporting system and alter its behavior. Moose, in contrast, is implemented as an object-oriented program where all objects are metaobjects derived from Class::MOP.

Ok, wait, so what are Metaprograms?

Metaprograms then are programs that manipulate other programs… or themselves. This can reduce development time by allowing programmers to minimize the number of lines of code to express a solution, or it can give programs greater flexibility to efficiently handle new situations.

Higher-order functions, also known as functors, i.e. functions that take one or more functions as an argument and return a function are a simplistic type of meta-programming in this sense.

The language in which the metaprogram is written is called the metalanguage. The language of the programs that are manipulated is called the object language. The ability of a programming language to be its own metalanguage is called reflection or reflexivity. Class::MOP offers reflexivity to Perl. And because Moose is built upon this powerful and well defined MOP substrate any future object systems which are likewise based on Class::MOP will interoperate seemlessly with Moose.

Moose may be the current state of the art for Object Oriented programming in Perl, but Class::MOP defines a robust future.

How is all of this useful?

Typically you probably don’t want to meta-program the Perl MOP… you’d just want to use the Moose defaults. Metaprogramming the MOP is bringing out the big guns… it’s a powerful tool we should employ only when the circumstance warrants. But as with other ideas that force a substantial shift in the way you think about things, the utility of a MOP and all this metaprogramming stuff might not be immediately apparent.

Let’s consider the way method resolution order works in languages, such as Perl, that support multiple inheritance. In Perl the algorithm used to locate the method in the inheritance hierarchy to dispatch to is a simple depth first search. As you may be aware such a strategy can create ambiguity in cases where multiple ancestors inherit from a common base class: this is known as the ‘diamond inheritance problem‘. Class::MOP enables us to use an alternate method lookup strategy, say C3, that behaves more predictably in these scenarios. We don’t need to fiddle with the source code that we maintain to solve these edge cases, we can fix this issue in the MOP in one location and all of our code can benefit. In fact, C3 MRO has already been implemented for Perl (but this is a somewhat moot point as other features of Moose eliminate the need to use inheritance at all in order to reuse previously defined behavior… I’ll talk more about this feature of Moose, called Roles, in a future blog post).

In a similar manner we might want to, say, use the MOP to enable our code to use a distributed service bus to locate the correct method to dispatch to, or to access a global cache to retrieve an object’s attribute values. This could all be done transparently in one place by metaprogramming the MOP rather than tinkering with potentially hundreds of thousands of lines of source code… only to find out later that network latency ate any performance we gained through distributing our processing. As with commodity rapid application development (RAD) tools, the ability to create cheap prototypes such as this can be a disruptive technology.

So, that’s what a MOP is and how it might (or might not) be useful. Remember, just because we’re using Moose doesn’t mean we have to run that engine on nitrous. But in the race of commerce where speed and efficiency of development time can be paramount to a company’s survival it’s nice to know that we can if we need to.

RIPS 2013 was a blast!

RIPS 2013 was a blast!

Research in Industrial Projects for Students (or RIPS) is an annual summer program organized by UCLA‘s Institute for Pure and Applied Math. Since 2001, students from all over the world are selected to participate in this program. For about eight weeks they work in groups on real-world projects proposed by industry sponsors. And for the first time, Oversee.net was one of the sponsors!

Our team (starting on the left) consisted of Corina Gurau (Jacobs University Bremen, Germany), Maya Rotmensch (Harvard University), and Sorcha Gilroy (National University of Ireland, University College Cork). Their academic mentor Cristina Garcia-Cardona provided guidance and support. The picture shows our team during an on-site visit of our downtown LA data center.

The task we proposed to our team was to try and improve domain categorization given sparse data input. One of the challenges we face at Oversee.net is how to classify millions of domains automatically, many of which do not have a history of content. Our team did a formidable job tackling this difficult problem. At the end of RIPS, they were able to significantly improve categorization performance. Check out the slides of their final presentation here:

 

Everyone involved in RIPS 2013 had a great time. We would like to thank IPAM for the opportunity to participate in this wonderful research challenge, and most of all we would like to thank our students and academic mentor for their dedication and hard work!

 

 

 

Let’s (not) Play Ruby Golf!

Just as a person who writes using simple words, using simple syntax, is a better communicator than a person who writes using big words and cryptic syntax, so an engineer who writes simple code, using clean syntax, is a better engineer than one who writes cryptic or obsfucated code.

Ruby Golf is the practice of writing Ruby in as few characters as possible, or, more generally, writing Ruby one-liners. I think Ruby Golf arises out a combination of these two common engineering practices:

“When the code works, I’m done” + “The fewer lines of code, the better” == Ruby Golf

Obviously, code that works is better than code that does not, and a few lines of well-expressed code is better than many lines of code no one can make sense of. But the above combination, of itself, does not make for useful code!

I find the following philosophy much more useful:

“Write code for the intelligent stranger” == Write code that any intelligent engineer, of any experience level, can easily understand.

The best Ruby Golf channel I’ve been able to find is stackoverflow.com. Consider this stackoverflow.com problem:

Given this input array of arrays . . .

input_array =
[[1, 1, 1, 1],
[1, 2, 1, 1],
[1, 3, 1, 7],
[1, 1, 4, 1]]

. . . provide an output_array whose elements are the sum of its “columns”. In other words, output_array’s first element is the sum of the 0th element of input_array’s inside arrays, output_array’s second element is the sume of the 1st element of input_array’s inside arrays, and so on.

Here’s one stackoverflow.com answer:

output_array = input_array.transpose.map{|e| e.inject(:+)}
=> [4, 7, 7, 10]

Would a junior programmer understand that one-liner right away? I don’t think so. I think that the shorthand use of inject . . .

e.inject(:+)

. . . is equivalent to using an obscure vocabulary word, when the engineer could just as easily have used a simple one.

I think a junior programmer would understand the following more easily:

input_array =
[[1, 1, 1, 1],
[1, 2, 1, 1],
[1, 3, 1, 7],
[1, 1, 4, 1]]

output_array = []
# Array#transpose flips rows and columns
# in an array of arrays whose inside
# arrays are equal in length
input_array.transpose.each do |a|
  sum = 0
  a.each do |e|
    sum += e
  end
  output_array.push(sum)
end
output_array

=> [4, 7, 7, 10]

Ok, maybe I’m exaggerating a bit to make a point. But I do think that, in the longer code, it’s clearer

– how Array#transpose works, and where you will run into problems, i.e. if your input is an array of arrays whose inside arrays are *not* of equal length
– what the “sum” accumulator is doing

How about this one:

Take this input array of integers . . .

input = [4, 3, 3, 1, 6, 6]

. . . and return an array whose repeated elements are removed

Here’s one stackoverflow.com answer:

input.
  map{|e, es| [e, es.length] }.
  reject{|e, count| count > 1 }.
  map(&:first)
=> [1, 4]

It’s easy (and common) to chain methods together like this because, on the console, you can chain them one at a time, and get intermediate output that gets you closer and closer to the answer you want. Below, I’ve included the output at each step:

# input = [4, 3, 3, 1, 6, 6]
input
  .group_by {|e| e }
  # {6=>[6, 6], 1=>[1], 3=>[3, 3], 4=>[4]}
  .map {|e, es| [e, es.length] }
  # [[6, 2], [1, 1], [3, 2], [4, 1]]
  .reject {|e, count| count > 1 }
  # [[1, 1], [4, 1]]
  .map(&:first) # [1, 4]

=> [1, 4]

Step-by-step, the above code breaks the problem into intermediate steps that edge closer and closer to the desired outcome. I think that the above style is all about the engineer walking through the thinking process in a way he or she can understand, rather than expressing a straightforward solution in a clear way that other engineers can understand.

I think the following is a better solution:

input_array = [4, 3, 3, 1, 6, 6]
# remove elements from the array . . .
input_array.reject! do |element|
  # . . . that occur more than once
  input_array.count(element) > 1
end

=> [4, 1]

What’s better about this solution:

– the output is not sorted, (not part of the problem as posed)
– the current object remains an array, rather than being converted to a hash, and back to an array
– input_array is destroyed as it’s iterated over
– there are useful comments

The first solution reads: “Given an input array, convert it to a hash whose keys are the unique elements of the input array, and whose values are arrays of all occurrences of those elements, then convert that hash to an array of arrays whose first element is the hash key, and whose second element is the number of occurrences of that key, then remove all elements of that array of arrays whose elements occur more than once, then remove the second element of this array of arrays, thereby returning an array of only the elements of the original array that occur only once.”

Even without the comments, the second solution reads “Given an input array, remove elements that occur more than once”.

:) :)

Writing good code is hard. Writing clean code that any intelligent engineer of any experience level can understand, is harder. But it’s also more useful, more efficient, and will make your projects far easier for everyone to work with!

– Charlie McElfresh

DDOS – Are You Prepared?

Think You’re Prepared for a DDOS Attack?   Think Again.

After getting hit by three DDOS attacks in March 2012 with peaks hitting 6 Gb/s, Oversee.net invested in a mitigation strategy designed to protect against further attacks.  In fact, thanks to the changes we had implemented, several smaller DDOS attacks in the ensuing months were successfully handled without any network down time.  Over the next 12 months our confidence soared, and in retrospect, we may have gotten a tad bit complacent.

The wake-up call occurred on March 25, 2013, which was right in the midst of the massive 300 Gb/s DDOS attacks that CyberBunker was launching against Spamhaus.   On March 25th, and over the next 5 days, our network was periodically shut down by DDOS attacks with 11Gb/s of sustained peaks.  Throughout those critical 5 days, we struggled to figure out what we had overlooked in our prior mitigation strategy and how to keep our network up and running.  Luckily, through great teamwork internally, with our ISPs, and with our network hardware vendor, we figured it out and implemented the solutions.

I’d like to share with you the steps we took in March 2012 and March 2013 to protect our network.  I’m hoping that by sharing these steps and key learnings, you might be able to look at your own DDOS mitigation strategies from a fresh perspective and take whatever steps you need to protect your network.

Response to March 2012 DDOS attacks with peaks hitting 6 Gb/s:

  • “Plan A”:  Our first idea was to try mitigating the attacks using GRE tunnels, which are used to filter traffic via encapsulation between our router and our ISP.   Unfortunately, we soon realized that our routers were not able to handle the protocol due to the lack of CPU and memory horsepower.
  • “Plan B”:  New border routers were quickly purchased and installed.  These border routers were capable of handling 80Gb/s across the backplane.  And since our network only supported connectivity to 2 ISPs at 10 Gb/s and 1 Gb/s respectively, we now had the bandwidth capable of handling 11Gb/s, plus the ability to block traffic at the ISP level via GRE tunnels.

Response to March 2013 DDOS attacks with sustained peaks of 11 Gb/s:

  • This DDOS attack was significantly more sophisticated than the ones in March 2012.  At first, it looked like they were hitting our border router, then our load balancers, and even our core router.  We felt helpless because everything we tried only fixed the problem temporarily.  Here is what we tried:
  • Upgraded our License on Load Balancers – we have high end load balancers capable of handling 17 million packets per second, unfortunately our license only allowed us to receive 3 million/s.  We were seeing well above this limit.
  • Blocked Traffic via ACLs – we utilized ACLs at the border router to block traffic from certain IP ranges from China and Russia.  We also had to change the routing engine for BGP updates, since the attackers seem to be attacking us at the BGP layer.
  • Modified Cabling on Routers – it appeared we only had a problem with the traffic coming from one of our ISPs.  It turned out this ISP was sending us unfiltered traffic, which was filtered by default by two other ISPs.  We learned a tough lesson about making sure to ask our all our ISPs about their filtering capabilities.  Don’t make assumptions.

After many hours of troubleshooting, it turned out the problem was fully resolved by working with our network hardware vendor.  We had to upgrade our firmware, and modify our routing engine as well.  It also turned out the DDOS attack at the BGP layer was only happening with one of our ISPs.  Unfortunately it was the ISP from whom we were getting 10Gb/s of bandwidth.

With a few million domains utilizing our network each month, you can argue that we are quite a bit more susceptible to DDOS attacks than your network.   Perhaps this makes you feel safer, but why take the risk?

Even if you think you’ve already got well designed DDOS mitigation strategies in place, don’t delay in reviewing them thoroughly with your ISP and hardware vendors, and then investing in the appropriate protections.  As our experience shows, DDOS attacks can come in many shapes, so it’s important to protect your network at various layers.  It may be difficult to justify the up-front expense, but chances are it would be a lot cheaper than suffering through network down time at the hands of a sophisticated DDOS attack.

Machine Learning Hack Night at Oversee.net

Machine Learning Hack Night at Oversee.net

We’re hosting another gathering of the LA Machine Learning meetup here at Oversee.net. Actually, we’re hosting a two-parter, on consecutive Tuesday evenings, May 7th and 14th.

The first evening will be a workshop, where Rob Zinkov will introduce participants to several ML tools, including the R language and RStudio, and the python language and scikit-learn toolkit. He’ll also provide a data set and describe a machine learning problem for participants to test their new knowledge on over the subsequent week.

The second evening will be a show-and-tell session of the best classifiers participants have built. We should learn a lot from each others’ approaches.

Rob has asked participants to RSVP to the meetup announcement, and as part of the RSVP, you’re requested to do a little homework. You need to install the tools, download a data set and a pair of scripts, and run the scripts. When you RSVP, you’ll be asked to fill in some information produced by the scripts. This way, everyone who shows up the first night will already have everything they need. We won’t need to waste time with installation issues, and can get right down to the algorithms.

If you run into any trouble installing the tools or running the scripts, feel free to ask questions in the comments here or on the meetup page.

LA Ruby March Meetup With Special Guest Ryan Bigg

We are hosting the next LA Ruby Meetup on 3/12 7:00 pm at Oversee.net HQ!  Special guest Ryan Bigg (aka Radar), co-author of Rails 4 In Action, author of the upcoming book Multitenancy with Rails, and winner of a 2011 Ruby Hero award, will talk about “Maintaining a large OSS project: war stories”. See details and register here.

 

Introducing Roger: A Java RPC Framework using RabbitMQ & Jackson

As my colleague Patrick mentioned in our previous post, we at Oversee’s Retail Group have been using RabbitMQ as the communication layer for the remote procedure calls done from the UI (written in Python) to the backend (written in Java). In Patrick’s post, he described his homegrown Python framework, Chu, for quickly and easily creating RPC clients within Tornado. This post is all about the Java framework, Roger, for creating stand-alone RPC servers.

Our legacy website backends used Java Servlets to send JSON-formatted RPC responses to the Python UI. While we may have moved away from using Servlets and HTTP (mainly for load-balancing reasons as detailed in the previous post), we have always been happy with JSON as a data serialization format. If you’re writing Java, the Jackson JSON library is currently the uncontested choice for parsing & creating JSON. Add on the Jackson Mapper, and you can write all of your code using Plain-Old-Java-Objects, while barely aware of the fact that JSON serialization & deserialization is happening automatically. So, Roger uses Jackson’s ObjectMapper for all of its message serialization & deserialization. And if you want more fine grained control over how your Java Objects turn into JSON, you can add JacksonAnnotations to your request & response objects.

[Read more...]