Writing Tweets to HBase – Simply

Hello World!

Many folks may know that the South Florida Evangelism team is undertaking a task that many think is impossible.  Well, in that statement all I hear is “there is still a chance!”  The end goal is to create a teddy bear that can have a conversation about anything.  So step one is to collect as much dialogue as possible from as many sources as possible and annotate them.  What better place to power an association engine for word and phrase relevance than something that forces you down to 140 characters to get your message across.

So as any normal developer I decided to start by looking for samples already out there.  MSDN has a great starter for writing tweets and doing sentiment analysis with HBase and C#.  The only issue with the sample is, that it is very poorly written and difficult to understand with no separation of concerns.  So I want to go through simplifying the solution and separating a few concerns out.

Configuration Files

The first thing to do is to start by putting important information into configuration files.  You should never ever ever have your important information hardcoded into the code base.  Especially when Azure has the ability to perform config transformations on your files.  This should always be in the configuration file.  We start by creating a configuration file as below…

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <appSettings>
    <add key="ClusterName" value="https://projectteddy.azurehdinsight.net" />
    <add key ="HadoopUserName" value="******" />
    <add key="HadoopUserPassword" value="******" />
    <add key="HBaseTableName" value="tweetSampleSentiment" />
    <add key="ConsumerKey" value="******"/>
    <add key="ConsumerSecret" value="******"/>
    <add key="AccessToken" value="******"/>
    <add key="AccessSecret" value="******"/>
  </appSettings>
    <startup> 
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
    </startup>
  <runtime>
    <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
      <dependentAssembly>
        <assemblyIdentity name="System.Net.Http.Primitives" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
        <bindingRedirect oldVersion="0.0.0.0-4.2.29.0" newVersion="4.2.29.0" />
      </dependentAssembly>
    </assemblyBinding>
  </runtime>
</configuration>

We then create a new format for a domain tweet class that is specific to what is being written to HBase.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tweetinvi.Core.Interfaces;

namespace TwitterStreamer
{
    public class TweetSentimentData
    {
        public string Id { get; set; }
        public string Text { get; set; }
        public string ReplyToId { get; set; }
        public string Coordinates { get; set; }
        public float Sentiment { get; set; }
        public DateTime CreatedOn { get; set; }
        public TweetSentimentData() { }
        public TweetSentimentData(ITweet tweet)
        {
            this.Id = tweet.IdStr;
            this.Text = tweet.Text;
            if (tweet.InReplyToStatusIdStr != null)
            {
                this.ReplyToId = tweet.InReplyToStatusIdStr;
            }
            else
            {
                this.ReplyToId = "";
            }
            this.CreatedOn = tweet.CreatedAt;
            if (tweet.Coordinates != null)
            {
                this.Coordinates = tweet.Coordinates.Longitude.ToString() + "," 
                    + tweet.Coordinates.Latitude.ToString();
            }
            this.Sentiment = SentimentHelper.CalculateSentiment(this.Text);
        }
    }
}

Now we have some beginnings to separation of concerns.  The biggest issue I took with the sample code on MSDN is that HBaseWriter.cs had far too much to do and was too confusing to understand, extend etc.  To get things a little more simply, we also remove out the actual sentiment analysis into its own helper class with a function called “CalculateSentiment”.  The reason we do this, is because sentiment calculation is very likely to change over time, prior to doing this it was spagettied into the code for writing to HBase, which is bad as we are definitely breaking the single responsibility rule.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TwitterStreamer
{
    public static class SentimentHelper
    {
        private static char[] _punctuationChars =
            new[] {
                ' ', '!', '"', '#', '$', '%', '&', ''', '(', ')', '*', '+', ',', '-', '.', '/',   //ascii 23--47
                ':', ';', '<', '=', '>', '?', '@', '[', ']', '^', '_', '`', '{', '|', '}', '~' };   //ascii 58--64 + misc.
        //// Sentiment dictionary file and the punctuation characters
        private const string DICTIONARYFILENAME = @".datadictionary.tsv";
        // a sentiment dictionary for estimate sentiment. It is loaded from a physical file.
        private static Dictionary<string, SentimentDictionaryItem> _dictionary { get; set; }
        public static Dictionary<string, SentimentDictionaryItem> Dictionary
        {
            get
            {
                if(_dictionary == null)
                {
                    _dictionary = LoadDictionary();
                    return _dictionary;
                }
                else{
                    return _dictionary;
                }
            }
        }
        public static float CalculateSentiment(string text)
        {
            string[] words = text.ToLower().Split(_punctuationChars);
            float sentiment = 0;
            float neutralChars = 0;
            foreach(string word in words)
            {
                SentimentDictionaryItem item = null;
                try
                {
                    item = Dictionary[word];
                }
                catch(Exception e) { neutralChars += 1;}
                if(item != null)
                {
                    if(item.Polarity.ToLower().CompareTo("positive") == 0)
                    {
                        sentiment += 1;
                    }
                    else if(item.Polarity.ToLower().CompareTo("negative") == 0)
                    {
                        sentiment -= 1;
                    }
                    else
                    {
                        neutralChars += 1;
                    }
                }
            }
            //normalize
            float divisor = words.Length - neutralChars;
            if (divisor < 1)
            {
                return 0;
            }
            return sentiment / divisor;
        }

        // Load sentiment dictionary from a file
        private static Dictionary<string, SentimentDictionaryItem> LoadDictionary()
        {
            List<string> lines = File.ReadAllLines(DICTIONARYFILENAME).ToList();
            var items = lines.Select(line =>
            {
                var fields = line.Split('t');
                var pos = 0;
                return new SentimentDictionaryItem
                {
                    Type = fields[pos++],
                    Length = Convert.ToInt32(fields[pos++]),
                    Word = fields[pos++],
                    Pos = fields[pos++],
                    Stemmed = fields[pos++],
                    Polarity = fields[pos++]
                };
            });

            var SDictionary = new Dictionary<string, SentimentDictionaryItem>();
            foreach (var item in items)
            {
                if (!SDictionary.Keys.Contains(item.Word))
                {
                    SDictionary.Add(item.Word, item);
                }
            }
            return SDictionary;
        }
    }
}

We now get down to having a nice cleaned up HBaseWriter.cs file with the only responsibility of adding tweets to a queue to be written, forming the actual cell to be written and dequeing tweets.  That is all it does now.  Much simpler and easier to deal with and fix if issues come up.

using Microsoft.HBase.Client;
using org.apache.hadoop.hbase.rest.protobuf.generated;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Tweetinvi.Core.Interfaces;

namespace TwitterStreamer
{
    public class HBaseWriter
    {
        // use multithread write
        private Thread WriterThread;
        private Queue<TweetSentimentData> WriteQueue = new Queue<TweetSentimentData>();
        private bool ThreadRunning = true;

        // For writting to HBase
        public HBaseClient client;
        //HDinsight HBase cluster and HBase table information
        public string ClusterName { get; set; }
        public string HadoopUserName { get; set; }
        public string HBaseTableName { get; set; }
        public HBaseWriter()
        {
            //Get the Hadoop Cluster info and create connection
            this.ClusterName = ConfigurationManager.AppSettings["ClusterName"];
            this.HadoopUserName = ConfigurationManager.AppSettings["HadoopUserName"];
            string HadoopUserPassword = ConfigurationManager.AppSettings["HadoopUserPassword"];
            this.HBaseTableName = ConfigurationManager.AppSettings["HBaseTableName"];
            SecureString pw = new SecureString();
            for(int i = 0; i < HadoopUserPassword.Length; i++){
                pw.InsertAt(i, HadoopUserPassword[i]);
            }
            Uri clusterUri = new Uri(this.ClusterName);
            ClusterCredentials creds = new ClusterCredentials(clusterUri, this.HadoopUserName, pw);
            this.client = new HBaseClient(creds);
            //create table and enable the hbase writer
            if (!client.ListTables().name.Contains(this.HBaseTableName))
            {
                // Create the table
                var tableSchema = new TableSchema();
                tableSchema.name = this.HBaseTableName;
                tableSchema.columns.Add(new ColumnSchema { name = "d" });
                client.CreateTable(tableSchema);
                Console.WriteLine("Table "{0}" created.", this.HBaseTableName);
            }
            WriterThread = new Thread(new ThreadStart(WriterThreadFunction));
            WriterThread.Start();
        }

        // Enqueue the Tweets received
        public void WriteTweet(TweetSentimentData tweet)
        {
            lock (this.WriteQueue)
            {
                this.WriteQueue.Enqueue(tweet);
            }
        }

        // Popular a CellSet object to be written into HBase
        private void CreateTweetByWordsCells(CellSet set, TweetSentimentData tweet)
        {
            // Create a row with a key
            var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(tweet.Id) };
            // Add columns to the row
            row.values.Add(
                new Cell { column = Encoding.UTF8.GetBytes("d:Text"), 
                    data = Encoding.UTF8.GetBytes(tweet.Text) });
            row.values.Add(
                new Cell { column = Encoding.UTF8.GetBytes("d:CreatedOn"),
                    data = Encoding.UTF8.GetBytes(tweet.CreatedOn.ToString()) });
            row.values.Add(
                new Cell { column = Encoding.UTF8.GetBytes("d:ReplyToId"),
                    data = Encoding.UTF8.GetBytes(tweet.ReplyToId) });
            row.values.Add(
                new Cell { column = Encoding.UTF8.GetBytes("d:Sentiment"),
                    data = Encoding.UTF8.GetBytes(tweet.Sentiment.ToString()) });
            if (tweet.Coordinates != null)
            {
                row.values.Add(
                    new Cell { column = Encoding.UTF8.GetBytes("d:Coordinates"),
                        data = Encoding.UTF8.GetBytes(tweet.Coordinates) });
            }
            set.rows.Add(row);
        }

        // Write a Tweet (CellSet) to HBase
        public void WriterThreadFunction()
        {
            while (ThreadRunning)
            {
                if (WriteQueue.Count > 0)
                {
                    CellSet set = new CellSet();
                    lock (WriteQueue)
                    {
                        do
                        {
                            TweetSentimentData tweet = WriteQueue.Dequeue();
                            CreateTweetByWordsCells(set, tweet);
                        } while (WriteQueue.Count > 0);
                    }
                    // Write the Tweet by words cell set to the HBase table
                    client.StoreCells(this.HBaseTableName, set);
                    Console.WriteLine("tRows written: {0}", set.rows.Count);
                }
            }
        }
    }
}

Finally we have our program.cs file, which brings in our configuration, initiates our writer, gets tweets and adds it to the queue to be written.  This is our orchestrator.

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tweetinvi;
using Tweetinvi.Core.Enum;
using Tweetinvi.Core.Events.EventArguments;

namespace TwitterStreamer
{
    public class Program
    {
        public static HBaseWriter hbaseWriter;
        static void Main(string[] args)
        {
            hbaseWriter = new HBaseWriter();
            string consumerKey = ConfigurationManager.AppSettings["ConsumerKey"];
            string consumerSecret = ConfigurationManager.AppSettings["ConsumerSecret"];
            string accessToken = ConfigurationManager.AppSettings["AccessToken"];
            string accessSecret = ConfigurationManager.AppSettings["AccessSecret"];
            TwitterCredentials.SetCredentials(accessToken, accessSecret, consumerKey, consumerSecret);
            Task.Run(() =>
            {
                StreamTwitter();
            });
            while (true) ;
        }
        private static void StreamTwitter()
        {
            var sampleStream = Stream.CreateSampleStream();
            sampleStream.AddTweetLanguageFilter(Language.English);
            sampleStream.TweetReceived += ReceiveTweet;
            sampleStream.StartStream();
        }
        private static void ReceiveTweet(object sender, TweetReceivedEventArgs args)
        {            
            if(hbaseWriter != null)
            {
                TweetSentimentData data = new TweetSentimentData(args.Tweet);
                hbaseWriter.WriteTweet(data);
                Console.WriteLine(args.Tweet.Text);
            }
        }
    }
}

Summary

We have now refactored the sample MSDN code to follow single responsibility as well as simplify the code to have a single orchestrator with other components that are part of that single responsibility paradigm.  Now we have something that we can deal with and begin integrating into our project.

 

One thought on “Writing Tweets to HBase – Simply

  1. Pingback: Powering AzureML with Hadoop HBase | Indie Dev Spot

Leave a Reply

Your email address will not be published. Required fields are marked *